mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
feat(historic-sql): use shared readiness probes
This commit is contained in:
parent
b5b1dc056f
commit
3677b1fb0c
12 changed files with 1337 additions and 456 deletions
157
packages/cli/src/context/ingest/historic-sql-probes.test.ts
Normal file
157
packages/cli/src/context/ingest/historic-sql-probes.test.ts
Normal file
|
|
@ -0,0 +1,157 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import type { HistoricSqlDialect } from './adapters/historic-sql/types.js';
|
||||
import {
|
||||
historicSqlProbeCatalogName,
|
||||
runHistoricSqlReadinessProbe,
|
||||
type HistoricSqlProbeRunner,
|
||||
type HistoricSqlProbeRunnerFactoryEntry,
|
||||
} from './historic-sql-probes.js';
|
||||
|
||||
function fakeRunner(
|
||||
dialect: HistoricSqlDialect,
|
||||
catalogName: string,
|
||||
options: { result?: unknown; error?: unknown } = {},
|
||||
): HistoricSqlProbeRunner & { runCalls: () => number } {
|
||||
let calls = 0;
|
||||
return {
|
||||
dialect,
|
||||
catalogName,
|
||||
async run() {
|
||||
calls += 1;
|
||||
if (options.error) {
|
||||
throw options.error;
|
||||
}
|
||||
return options.result ?? { warnings: [], info: [] };
|
||||
},
|
||||
formatSuccessDetail() {
|
||||
return { detail: `${catalogName} ready`, warnings: [] };
|
||||
},
|
||||
fixAdvice(error) {
|
||||
return {
|
||||
failHeadline: error instanceof Error ? error.message : String(error),
|
||||
remediation: 'Fix the test probe.',
|
||||
};
|
||||
},
|
||||
runCalls: () => calls,
|
||||
};
|
||||
}
|
||||
|
||||
function factories(
|
||||
overrides: Partial<Record<HistoricSqlDialect, HistoricSqlProbeRunner>>,
|
||||
): Record<HistoricSqlDialect, HistoricSqlProbeRunnerFactoryEntry> {
|
||||
const postgres = overrides.postgres ?? fakeRunner('postgres', 'pg_stat_statements');
|
||||
const snowflake =
|
||||
overrides.snowflake ??
|
||||
fakeRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY');
|
||||
const bigquery =
|
||||
overrides.bigquery ?? fakeRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT');
|
||||
|
||||
return {
|
||||
postgres: {
|
||||
catalogName: 'pg_stat_statements',
|
||||
load: vi.fn(async () => postgres),
|
||||
},
|
||||
snowflake: {
|
||||
catalogName: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
load: vi.fn(async () => snowflake),
|
||||
},
|
||||
bigquery: {
|
||||
catalogName: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
load: vi.fn(async () => bigquery),
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe('historic-SQL probe registry', () => {
|
||||
it('returns null when the connection has no query-history dialect', async () => {
|
||||
const deps = { factories: factories({}), cache: new Map() };
|
||||
|
||||
await expect(
|
||||
runHistoricSqlReadinessProbe(
|
||||
{
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'mysql',
|
||||
connection: {
|
||||
driver: 'mysql',
|
||||
context: { queryHistory: { enabled: true } },
|
||||
},
|
||||
env: {},
|
||||
},
|
||||
deps,
|
||||
),
|
||||
).resolves.toBeNull();
|
||||
|
||||
expect(deps.factories.postgres.load).not.toHaveBeenCalled();
|
||||
expect(deps.factories.snowflake.load).not.toHaveBeenCalled();
|
||||
expect(deps.factories.bigquery.load).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('dispatches to the dialect runner and caches the runner instance', async () => {
|
||||
const runner = fakeRunner('postgres', 'pg_stat_statements', {
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
});
|
||||
const deps = { factories: factories({ postgres: runner }), cache: new Map() };
|
||||
const input = {
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'postgres',
|
||||
url: 'env:DATABASE_URL',
|
||||
context: { queryHistory: { enabled: true } },
|
||||
},
|
||||
env: {},
|
||||
};
|
||||
|
||||
const first = await runHistoricSqlReadinessProbe(input, deps);
|
||||
const second = await runHistoricSqlReadinessProbe(input, deps);
|
||||
|
||||
expect(first).toMatchObject({ ok: true, dialect: 'postgres', runner });
|
||||
expect(second).toMatchObject({ ok: true, dialect: 'postgres', runner });
|
||||
expect(deps.factories.postgres.load).toHaveBeenCalledTimes(1);
|
||||
expect(runner.runCalls()).toBe(2);
|
||||
});
|
||||
|
||||
it('normalizes runner errors into a failed outcome', async () => {
|
||||
const error = new Error('missing grants');
|
||||
const runner = fakeRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT', {
|
||||
error,
|
||||
});
|
||||
const deps = { factories: factories({ bigquery: runner }), cache: new Map() };
|
||||
|
||||
await expect(
|
||||
runHistoricSqlReadinessProbe(
|
||||
{
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'bq',
|
||||
connection: {
|
||||
driver: 'bigquery',
|
||||
credentials_json: '{"project_id":"project-1"}',
|
||||
context: { queryHistory: { enabled: true } },
|
||||
},
|
||||
env: {},
|
||||
},
|
||||
deps,
|
||||
),
|
||||
).resolves.toEqual({
|
||||
ok: false,
|
||||
dialect: 'bigquery',
|
||||
runner,
|
||||
error,
|
||||
});
|
||||
});
|
||||
|
||||
it('returns catalog names without loading runner modules', () => {
|
||||
const deps = { factories: factories({}), cache: new Map() };
|
||||
|
||||
expect(historicSqlProbeCatalogName('postgres', deps)).toBe('pg_stat_statements');
|
||||
expect(historicSqlProbeCatalogName('snowflake', deps)).toBe(
|
||||
'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
);
|
||||
expect(historicSqlProbeCatalogName('bigquery', deps)).toBe(
|
||||
'INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
);
|
||||
expect(deps.factories.postgres.load).not.toHaveBeenCalled();
|
||||
expect(deps.factories.snowflake.load).not.toHaveBeenCalled();
|
||||
expect(deps.factories.bigquery.load).not.toHaveBeenCalled();
|
||||
});
|
||||
});
|
||||
141
packages/cli/src/context/ingest/historic-sql-probes.ts
Normal file
141
packages/cli/src/context/ingest/historic-sql-probes.ts
Normal file
|
|
@ -0,0 +1,141 @@
|
|||
import type { KtxProjectConnectionConfig } from '../project/config.js';
|
||||
import { queryHistoryDialectForConnection } from './adapters/historic-sql/connection-dialect.js';
|
||||
import type { HistoricSqlDialect } from './adapters/historic-sql/types.js';
|
||||
|
||||
export interface HistoricSqlFixAdvice {
|
||||
failHeadline: string;
|
||||
remediation: string;
|
||||
}
|
||||
|
||||
export interface HistoricSqlSuccessDetail {
|
||||
detail: string;
|
||||
warnings: string[];
|
||||
}
|
||||
|
||||
export interface HistoricSqlProbeInput {
|
||||
projectDir: string;
|
||||
connectionId: string;
|
||||
connection: KtxProjectConnectionConfig;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
}
|
||||
|
||||
export interface HistoricSqlProbeRunner {
|
||||
readonly dialect: HistoricSqlDialect;
|
||||
readonly catalogName: string;
|
||||
run(input: HistoricSqlProbeInput): Promise<unknown>;
|
||||
formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail;
|
||||
fixAdvice(error: unknown): HistoricSqlFixAdvice;
|
||||
}
|
||||
|
||||
/** @internal */
|
||||
export interface HistoricSqlProbeRunnerFactoryEntry {
|
||||
readonly catalogName: string;
|
||||
load(): Promise<HistoricSqlProbeRunner>;
|
||||
}
|
||||
|
||||
export type HistoricSqlProbeOutcome =
|
||||
| {
|
||||
ok: true;
|
||||
dialect: HistoricSqlDialect;
|
||||
runner: HistoricSqlProbeRunner;
|
||||
result: unknown;
|
||||
}
|
||||
| {
|
||||
ok: false;
|
||||
dialect: HistoricSqlDialect;
|
||||
runner: HistoricSqlProbeRunner;
|
||||
error: unknown;
|
||||
};
|
||||
|
||||
export type HistoricSqlReadinessProbe = (
|
||||
input: HistoricSqlProbeInput,
|
||||
) => Promise<HistoricSqlProbeOutcome | null>;
|
||||
|
||||
export interface HistoricSqlProbeRegistryDeps {
|
||||
factories?: Record<HistoricSqlDialect, HistoricSqlProbeRunnerFactoryEntry>;
|
||||
cache?: Map<HistoricSqlDialect, HistoricSqlProbeRunner>;
|
||||
}
|
||||
|
||||
const defaultHistoricSqlProbeRunnerFactories: Record<
|
||||
HistoricSqlDialect,
|
||||
HistoricSqlProbeRunnerFactoryEntry
|
||||
> = {
|
||||
postgres: {
|
||||
catalogName: 'pg_stat_statements',
|
||||
load: async () => {
|
||||
const { PostgresPgssProbeRunner } = await import(
|
||||
'./historic-sql-probes/postgres-runner.js'
|
||||
);
|
||||
return new PostgresPgssProbeRunner();
|
||||
},
|
||||
},
|
||||
snowflake: {
|
||||
catalogName: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
load: async () => {
|
||||
const { SnowflakeAccountUsageProbeRunner } = await import(
|
||||
'./historic-sql-probes/snowflake-runner.js'
|
||||
);
|
||||
return new SnowflakeAccountUsageProbeRunner();
|
||||
},
|
||||
},
|
||||
bigquery: {
|
||||
catalogName: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
load: async () => {
|
||||
const { BigQueryJobsByProjectProbeRunner } = await import(
|
||||
'./historic-sql-probes/bigquery-runner.js'
|
||||
);
|
||||
return new BigQueryJobsByProjectProbeRunner();
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
const DEFAULT_RUNNER_CACHE = new Map<HistoricSqlDialect, HistoricSqlProbeRunner>();
|
||||
|
||||
function registryDeps(input: HistoricSqlProbeRegistryDeps) {
|
||||
return {
|
||||
factories: input.factories ?? defaultHistoricSqlProbeRunnerFactories,
|
||||
cache: input.cache ?? DEFAULT_RUNNER_CACHE,
|
||||
};
|
||||
}
|
||||
|
||||
export function historicSqlProbeCatalogName(
|
||||
dialect: HistoricSqlDialect,
|
||||
deps: HistoricSqlProbeRegistryDeps = {},
|
||||
): string {
|
||||
return registryDeps(deps).factories[dialect].catalogName;
|
||||
}
|
||||
|
||||
async function loadHistoricSqlProbeRunner(
|
||||
dialect: HistoricSqlDialect,
|
||||
deps: HistoricSqlProbeRegistryDeps = {},
|
||||
): Promise<HistoricSqlProbeRunner> {
|
||||
const { factories, cache } = registryDeps(deps);
|
||||
const cached = cache.get(dialect);
|
||||
if (cached) {
|
||||
return cached;
|
||||
}
|
||||
const runner = await factories[dialect].load();
|
||||
cache.set(dialect, runner);
|
||||
return runner;
|
||||
}
|
||||
|
||||
export async function runHistoricSqlReadinessProbe(
|
||||
input: HistoricSqlProbeInput,
|
||||
deps: HistoricSqlProbeRegistryDeps = {},
|
||||
): Promise<HistoricSqlProbeOutcome | null> {
|
||||
const dialect = queryHistoryDialectForConnection(input.connection);
|
||||
if (!dialect) {
|
||||
return null;
|
||||
}
|
||||
const runner = await loadHistoricSqlProbeRunner(dialect, deps);
|
||||
try {
|
||||
return {
|
||||
ok: true,
|
||||
dialect,
|
||||
runner,
|
||||
result: await runner.run(input),
|
||||
};
|
||||
} catch (error) {
|
||||
return { ok: false, dialect, runner, error };
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js';
|
||||
import { BigQueryJobsByProjectProbeRunner } from './bigquery-runner.js';
|
||||
|
||||
describe('BigQueryJobsByProjectProbeRunner', () => {
|
||||
it('creates a region-scoped reader, runs it, and cleans up the connector', async () => {
|
||||
const cleanup = vi.fn(async () => undefined);
|
||||
const reader = {
|
||||
probe: vi.fn(async () => ({ warnings: [], info: ['region: eu'] })),
|
||||
};
|
||||
const createReader = vi.fn(() => reader);
|
||||
const runner = new BigQueryJobsByProjectProbeRunner({
|
||||
createReader,
|
||||
createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }),
|
||||
resolveReference: () => '{"project_id":"project-1"}',
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'bq',
|
||||
connection: {
|
||||
driver: 'bigquery',
|
||||
credentials_json: 'env:BQ_CREDENTIALS_JSON',
|
||||
location: 'EU',
|
||||
},
|
||||
env: {},
|
||||
}),
|
||||
).resolves.toEqual({ warnings: [], info: ['region: eu'] });
|
||||
expect(createReader).toHaveBeenCalledWith({ projectId: 'project-1', region: 'EU' });
|
||||
expect(reader.probe).toHaveBeenCalledOnce();
|
||||
expect(cleanup).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('uses us as the default BigQuery region', async () => {
|
||||
const createReader = vi.fn(() => ({
|
||||
probe: vi.fn(async () => ({ warnings: [], info: [] })),
|
||||
}));
|
||||
const runner = new BigQueryJobsByProjectProbeRunner({
|
||||
createReader,
|
||||
createClient: () => ({ client: {}, cleanup: vi.fn(async () => undefined) }),
|
||||
resolveReference: () => '{"project_id":"project-1"}',
|
||||
});
|
||||
|
||||
await runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'bq',
|
||||
connection: {
|
||||
driver: 'bigquery',
|
||||
credentials_json: '{"project_id":"project-1"}',
|
||||
},
|
||||
env: {},
|
||||
});
|
||||
|
||||
expect(createReader).toHaveBeenCalledWith({ projectId: 'project-1', region: 'us' });
|
||||
});
|
||||
|
||||
it('rejects missing BigQuery credentials_json.project_id', async () => {
|
||||
const runner = new BigQueryJobsByProjectProbeRunner({
|
||||
createReader: vi.fn(),
|
||||
createClient: () => ({ client: {}, cleanup: vi.fn() }),
|
||||
resolveReference: () => '{"client_email":"svc@example.test"}',
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'bq',
|
||||
connection: {
|
||||
driver: 'bigquery',
|
||||
credentials_json: 'env:BQ_CREDENTIALS_JSON',
|
||||
},
|
||||
env: {},
|
||||
}),
|
||||
).rejects.toThrow('Query history BigQuery connection bq requires credentials_json.project_id');
|
||||
});
|
||||
|
||||
it('formats successful BigQuery details', () => {
|
||||
const runner = new BigQueryJobsByProjectProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.formatSuccessDetail({
|
||||
warnings: ['JOBS_BY_PROJECT is delayed'],
|
||||
info: ['region: us'],
|
||||
}),
|
||||
).toEqual({
|
||||
detail: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT ready; region: us',
|
||||
warnings: ['JOBS_BY_PROJECT is delayed'],
|
||||
});
|
||||
});
|
||||
|
||||
it('maps BigQuery grant errors to runner advice', () => {
|
||||
const runner = new BigQueryJobsByProjectProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.fixAdvice(
|
||||
new HistoricSqlGrantsMissingError({
|
||||
dialect: 'bigquery',
|
||||
message: 'principal cannot query JOBS_BY_PROJECT',
|
||||
remediation:
|
||||
'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.',
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
remediation:
|
||||
'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,160 @@
|
|||
import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js';
|
||||
import { BigQueryHistoricSqlQueryHistoryReader } from '../adapters/historic-sql/bigquery-query-history-reader.js';
|
||||
import {
|
||||
type HistoricSqlFixAdvice,
|
||||
type HistoricSqlProbeInput,
|
||||
type HistoricSqlProbeRunner,
|
||||
type HistoricSqlSuccessDetail,
|
||||
} from '../historic-sql-probes.js';
|
||||
import { resolveKtxConfigReference } from '../../core/config-reference.js';
|
||||
import {
|
||||
isKtxBigQueryConnectionConfig,
|
||||
KtxBigQueryScanConnector,
|
||||
type KtxBigQueryConnectionConfig,
|
||||
} from '../../../connectors/bigquery/connector.js';
|
||||
|
||||
interface GenericProbeResult {
|
||||
warnings: string[];
|
||||
info?: string[];
|
||||
}
|
||||
|
||||
interface ClientHandle {
|
||||
client: unknown;
|
||||
cleanup(): Promise<void>;
|
||||
}
|
||||
|
||||
interface BigQueryJobsByProjectProbeRunnerOptions {
|
||||
createReader?: (options: { projectId: string; region: string }) => {
|
||||
probe(client: unknown): Promise<GenericProbeResult>;
|
||||
};
|
||||
createClient?: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxBigQueryConnectionConfig },
|
||||
) => ClientHandle;
|
||||
resolveReference?: (value: string | undefined, env: NodeJS.ProcessEnv) => string | undefined;
|
||||
}
|
||||
|
||||
function bigQueryProjectId(
|
||||
connectionId: string,
|
||||
connection: KtxBigQueryConnectionConfig,
|
||||
env: NodeJS.ProcessEnv,
|
||||
resolveReference: (value: string | undefined, env: NodeJS.ProcessEnv) => string | undefined,
|
||||
): string {
|
||||
const rawCredentials =
|
||||
typeof connection.credentials_json === 'string' ? connection.credentials_json : '';
|
||||
const resolvedCredentials = resolveReference(rawCredentials, env);
|
||||
if (!resolvedCredentials) {
|
||||
throw new Error(`Query history BigQuery connection ${connectionId} requires credentials_json`);
|
||||
}
|
||||
const parsed = JSON.parse(resolvedCredentials) as { project_id?: unknown };
|
||||
if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) {
|
||||
throw new Error(
|
||||
`Query history BigQuery connection ${connectionId} requires credentials_json.project_id`,
|
||||
);
|
||||
}
|
||||
return parsed.project_id;
|
||||
}
|
||||
|
||||
function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string {
|
||||
return typeof connection.location === 'string' && connection.location.trim().length > 0
|
||||
? connection.location.trim()
|
||||
: 'us';
|
||||
}
|
||||
|
||||
function infoSuffix(info: readonly string[] | undefined): string {
|
||||
return info && info.length > 0 ? `; ${info.join('; ')}` : '';
|
||||
}
|
||||
|
||||
export class BigQueryJobsByProjectProbeRunner implements HistoricSqlProbeRunner {
|
||||
readonly dialect = 'bigquery' as const;
|
||||
readonly catalogName = 'INFORMATION_SCHEMA.JOBS_BY_PROJECT';
|
||||
|
||||
private readonly createReader: (options: { projectId: string; region: string }) => {
|
||||
probe(client: unknown): Promise<GenericProbeResult>;
|
||||
};
|
||||
private readonly createClient: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxBigQueryConnectionConfig },
|
||||
) => ClientHandle;
|
||||
private readonly resolveReference: (
|
||||
value: string | undefined,
|
||||
env: NodeJS.ProcessEnv,
|
||||
) => string | undefined;
|
||||
|
||||
constructor(options: BigQueryJobsByProjectProbeRunnerOptions = {}) {
|
||||
this.createReader =
|
||||
options.createReader ??
|
||||
((readerOptions) => new BigQueryHistoricSqlQueryHistoryReader(readerOptions));
|
||||
this.createClient =
|
||||
options.createClient ??
|
||||
((input) => {
|
||||
const connector = new KtxBigQueryScanConnector({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
env: input.env,
|
||||
});
|
||||
return {
|
||||
client: {
|
||||
async executeQuery(sql: string) {
|
||||
const result = await connector.executeReadOnly(
|
||||
{ connectionId: input.connectionId, sql },
|
||||
{} as never,
|
||||
);
|
||||
return {
|
||||
headers: result.headers,
|
||||
rows: result.rows,
|
||||
totalRows: result.totalRows,
|
||||
};
|
||||
},
|
||||
},
|
||||
cleanup: () => connector.cleanup(),
|
||||
};
|
||||
});
|
||||
this.resolveReference = options.resolveReference ?? resolveKtxConfigReference;
|
||||
}
|
||||
|
||||
async run(input: HistoricSqlProbeInput): Promise<GenericProbeResult> {
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxBigQueryConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
const projectId = bigQueryProjectId(
|
||||
input.connectionId,
|
||||
input.connection,
|
||||
input.env ?? process.env,
|
||||
this.resolveReference,
|
||||
);
|
||||
const reader = this.createReader({
|
||||
projectId,
|
||||
region: bigQueryRegion(input.connection),
|
||||
});
|
||||
const handle = this.createClient({
|
||||
...input,
|
||||
connection: input.connection,
|
||||
});
|
||||
try {
|
||||
return await reader.probe(handle.client);
|
||||
} finally {
|
||||
await handle.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail {
|
||||
const probeResult = result as GenericProbeResult;
|
||||
return {
|
||||
detail: `${this.catalogName} ready${infoSuffix(probeResult.info)}`,
|
||||
warnings: probeResult.warnings,
|
||||
};
|
||||
}
|
||||
|
||||
fixAdvice(error: unknown): HistoricSqlFixAdvice {
|
||||
if (error instanceof HistoricSqlGrantsMissingError) {
|
||||
return {
|
||||
failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
remediation: error.remediation,
|
||||
};
|
||||
}
|
||||
return {
|
||||
failHeadline: `${this.catalogName} readiness check failed`,
|
||||
remediation: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,113 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import {
|
||||
HistoricSqlExtensionMissingError,
|
||||
HistoricSqlGrantsMissingError,
|
||||
HistoricSqlVersionUnsupportedError,
|
||||
} from '../adapters/historic-sql/errors.js';
|
||||
import { PostgresPgssProbeRunner } from './postgres-runner.js';
|
||||
|
||||
describe('PostgresPgssProbeRunner', () => {
|
||||
it('runs the pg_stat_statements reader and cleans up the client', async () => {
|
||||
const cleanup = vi.fn(async () => undefined);
|
||||
const reader = {
|
||||
probe: vi.fn(async () => ({
|
||||
pgServerVersion: 'PostgreSQL 16.4',
|
||||
warnings: [],
|
||||
info: ['tracked statements: 12'],
|
||||
})),
|
||||
};
|
||||
const runner = new PostgresPgssProbeRunner({
|
||||
reader,
|
||||
createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }),
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'warehouse',
|
||||
connection: { driver: 'postgres', url: 'env:DATABASE_URL' },
|
||||
env: {},
|
||||
}),
|
||||
).resolves.toEqual({
|
||||
pgServerVersion: 'PostgreSQL 16.4',
|
||||
warnings: [],
|
||||
info: ['tracked statements: 12'],
|
||||
});
|
||||
expect(reader.probe).toHaveBeenCalledOnce();
|
||||
expect(cleanup).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('rejects non-Postgres connections', async () => {
|
||||
const runner = new PostgresPgssProbeRunner({
|
||||
reader: { probe: vi.fn() },
|
||||
createClient: () => ({ client: {}, cleanup: vi.fn() }),
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'warehouse',
|
||||
connection: { driver: 'snowflake' },
|
||||
env: {},
|
||||
}),
|
||||
).rejects.toThrow('Native PostgreSQL connector cannot run driver "snowflake"');
|
||||
});
|
||||
|
||||
it('formats successful Postgres details', () => {
|
||||
const runner = new PostgresPgssProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.formatSuccessDetail({
|
||||
pgServerVersion: 'PostgreSQL 16.4',
|
||||
warnings: ['pg_stat_statements.track is top'],
|
||||
info: ['tracked statements: 12'],
|
||||
}),
|
||||
).toEqual({
|
||||
detail: 'pg_stat_statements ready (PostgreSQL 16.4); tracked statements: 12',
|
||||
warnings: ['pg_stat_statements.track is top'],
|
||||
});
|
||||
});
|
||||
|
||||
it('maps Postgres probe errors to actionable advice', () => {
|
||||
const runner = new PostgresPgssProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.fixAdvice(
|
||||
new HistoricSqlExtensionMissingError({
|
||||
dialect: 'postgres',
|
||||
message: 'pg_stat_statements missing',
|
||||
remediation: 'CREATE EXTENSION pg_stat_statements;',
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
failHeadline: 'pg_stat_statements extension is missing',
|
||||
remediation: 'CREATE EXTENSION pg_stat_statements;',
|
||||
});
|
||||
|
||||
expect(
|
||||
runner.fixAdvice(
|
||||
new HistoricSqlGrantsMissingError({
|
||||
dialect: 'postgres',
|
||||
message: 'missing grants',
|
||||
remediation: 'GRANT pg_read_all_stats TO <connection role>;',
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
failHeadline: 'Postgres connection role lacks pg_read_all_stats',
|
||||
remediation: 'GRANT pg_read_all_stats TO <connection role>;',
|
||||
});
|
||||
|
||||
expect(
|
||||
runner.fixAdvice(
|
||||
new HistoricSqlVersionUnsupportedError({
|
||||
dialect: 'postgres',
|
||||
detectedVersion: 'PostgreSQL 13.12',
|
||||
minimumVersion: 'PostgreSQL 14',
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
failHeadline: 'Postgres version too old',
|
||||
remediation: 'Use PostgreSQL 14 or newer, or disable query history for this connection',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,111 @@
|
|||
import {
|
||||
HistoricSqlExtensionMissingError,
|
||||
HistoricSqlGrantsMissingError,
|
||||
HistoricSqlVersionUnsupportedError,
|
||||
} from '../adapters/historic-sql/errors.js';
|
||||
import { PostgresPgssReader } from '../adapters/historic-sql/postgres-pgss-reader.js';
|
||||
import type { PostgresPgssProbeResult } from '../adapters/historic-sql/types.js';
|
||||
import {
|
||||
type HistoricSqlFixAdvice,
|
||||
type HistoricSqlProbeInput,
|
||||
type HistoricSqlProbeRunner,
|
||||
type HistoricSqlSuccessDetail,
|
||||
} from '../historic-sql-probes.js';
|
||||
import {
|
||||
isKtxPostgresConnectionConfig,
|
||||
type KtxPostgresConnectionConfig,
|
||||
} from '../../../connectors/postgres/connector.js';
|
||||
import { KtxPostgresHistoricSqlQueryClient } from '../../../connectors/postgres/historic-sql-query-client.js';
|
||||
|
||||
interface ClientHandle {
|
||||
client: unknown;
|
||||
cleanup(): Promise<void>;
|
||||
}
|
||||
|
||||
interface PostgresPgssProbeRunnerOptions {
|
||||
reader?: { probe(client: unknown): Promise<PostgresPgssProbeResult> };
|
||||
createClient?: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxPostgresConnectionConfig },
|
||||
) => ClientHandle;
|
||||
}
|
||||
|
||||
function genericAdvice(error: unknown, catalogName: string): HistoricSqlFixAdvice {
|
||||
return {
|
||||
failHeadline: `${catalogName} readiness check failed`,
|
||||
remediation: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
|
||||
function infoSuffix(info: readonly string[] | undefined): string {
|
||||
return info && info.length > 0 ? `; ${info.join('; ')}` : '';
|
||||
}
|
||||
|
||||
export class PostgresPgssProbeRunner implements HistoricSqlProbeRunner {
|
||||
readonly dialect = 'postgres' as const;
|
||||
readonly catalogName = 'pg_stat_statements';
|
||||
|
||||
private readonly reader: { probe(client: unknown): Promise<PostgresPgssProbeResult> };
|
||||
private readonly createClient: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxPostgresConnectionConfig },
|
||||
) => ClientHandle;
|
||||
|
||||
constructor(options: PostgresPgssProbeRunnerOptions = {}) {
|
||||
this.reader = options.reader ?? new PostgresPgssReader();
|
||||
this.createClient =
|
||||
options.createClient ??
|
||||
((input) => {
|
||||
const client = new KtxPostgresHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
env: input.env,
|
||||
});
|
||||
return { client, cleanup: () => client.cleanup() };
|
||||
});
|
||||
}
|
||||
|
||||
async run(input: HistoricSqlProbeInput): Promise<PostgresPgssProbeResult> {
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxPostgresConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
const handle = this.createClient({
|
||||
...input,
|
||||
connection: input.connection,
|
||||
});
|
||||
try {
|
||||
return await this.reader.probe(handle.client);
|
||||
} finally {
|
||||
await handle.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail {
|
||||
const pgssResult = result as PostgresPgssProbeResult;
|
||||
return {
|
||||
detail: `pg_stat_statements ready (${pgssResult.pgServerVersion})${infoSuffix(pgssResult.info)}`,
|
||||
warnings: pgssResult.warnings,
|
||||
};
|
||||
}
|
||||
|
||||
fixAdvice(error: unknown): HistoricSqlFixAdvice {
|
||||
if (error instanceof HistoricSqlExtensionMissingError) {
|
||||
return {
|
||||
failHeadline: 'pg_stat_statements extension is missing',
|
||||
remediation: error.remediation,
|
||||
};
|
||||
}
|
||||
if (error instanceof HistoricSqlGrantsMissingError) {
|
||||
return {
|
||||
failHeadline: 'Postgres connection role lacks pg_read_all_stats',
|
||||
remediation: error.remediation,
|
||||
};
|
||||
}
|
||||
if (error instanceof HistoricSqlVersionUnsupportedError) {
|
||||
return {
|
||||
failHeadline: 'Postgres version too old',
|
||||
remediation: 'Use PostgreSQL 14 or newer, or disable query history for this connection',
|
||||
};
|
||||
}
|
||||
return genericAdvice(error, this.catalogName);
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,82 @@
|
|||
import { describe, expect, it, vi } from 'vitest';
|
||||
import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js';
|
||||
import { SnowflakeAccountUsageProbeRunner } from './snowflake-runner.js';
|
||||
|
||||
describe('SnowflakeAccountUsageProbeRunner', () => {
|
||||
it('runs the account usage reader and cleans up the client', async () => {
|
||||
const cleanup = vi.fn(async () => undefined);
|
||||
const reader = {
|
||||
probe: vi.fn(async () => ({ warnings: [], info: ['query history available'] })),
|
||||
};
|
||||
const runner = new SnowflakeAccountUsageProbeRunner({
|
||||
reader,
|
||||
createClient: () => ({ client: { executeQuery: vi.fn() }, cleanup }),
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'warehouse',
|
||||
connection: {
|
||||
driver: 'snowflake',
|
||||
account: 'ACCT',
|
||||
warehouse: 'WH',
|
||||
database: 'ANALYTICS',
|
||||
username: 'reader',
|
||||
},
|
||||
env: {},
|
||||
}),
|
||||
).resolves.toEqual({ warnings: [], info: ['query history available'] });
|
||||
expect(reader.probe).toHaveBeenCalledOnce();
|
||||
expect(cleanup).toHaveBeenCalledOnce();
|
||||
});
|
||||
|
||||
it('rejects non-Snowflake connections', async () => {
|
||||
const runner = new SnowflakeAccountUsageProbeRunner({
|
||||
reader: { probe: vi.fn() },
|
||||
createClient: () => ({ client: {}, cleanup: vi.fn() }),
|
||||
});
|
||||
|
||||
await expect(
|
||||
runner.run({
|
||||
projectDir: '/work/project',
|
||||
connectionId: 'warehouse',
|
||||
connection: { driver: 'postgres' },
|
||||
env: {},
|
||||
}),
|
||||
).rejects.toThrow('Native Snowflake connector cannot run driver "postgres"');
|
||||
});
|
||||
|
||||
it('formats successful Snowflake details', () => {
|
||||
const runner = new SnowflakeAccountUsageProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.formatSuccessDetail({
|
||||
warnings: ['query history is delayed'],
|
||||
info: ['warehouse: WH'],
|
||||
}),
|
||||
).toEqual({
|
||||
detail: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY ready; warehouse: WH',
|
||||
warnings: ['query history is delayed'],
|
||||
});
|
||||
});
|
||||
|
||||
it('maps Snowflake grant errors to runner advice', () => {
|
||||
const runner = new SnowflakeAccountUsageProbeRunner();
|
||||
|
||||
expect(
|
||||
runner.fixAdvice(
|
||||
new HistoricSqlGrantsMissingError({
|
||||
dialect: 'snowflake',
|
||||
message: 'role cannot read account usage',
|
||||
remediation:
|
||||
'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
}),
|
||||
),
|
||||
).toEqual({
|
||||
failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
remediation:
|
||||
'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
});
|
||||
});
|
||||
});
|
||||
|
|
@ -0,0 +1,96 @@
|
|||
import { HistoricSqlGrantsMissingError } from '../adapters/historic-sql/errors.js';
|
||||
import { SnowflakeHistoricSqlQueryHistoryReader } from '../adapters/historic-sql/snowflake-query-history-reader.js';
|
||||
import {
|
||||
type HistoricSqlFixAdvice,
|
||||
type HistoricSqlProbeInput,
|
||||
type HistoricSqlProbeRunner,
|
||||
type HistoricSqlSuccessDetail,
|
||||
} from '../historic-sql-probes.js';
|
||||
import {
|
||||
isKtxSnowflakeConnectionConfig,
|
||||
type KtxSnowflakeConnectionConfig,
|
||||
} from '../../../connectors/snowflake/connector.js';
|
||||
import { KtxSnowflakeHistoricSqlQueryClient } from '../../../connectors/snowflake/historic-sql-query-client.js';
|
||||
|
||||
interface GenericProbeResult {
|
||||
warnings: string[];
|
||||
info?: string[];
|
||||
}
|
||||
|
||||
interface ClientHandle {
|
||||
client: unknown;
|
||||
cleanup(): Promise<void>;
|
||||
}
|
||||
|
||||
interface SnowflakeAccountUsageProbeRunnerOptions {
|
||||
reader?: { probe(client: unknown): Promise<GenericProbeResult> };
|
||||
createClient?: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxSnowflakeConnectionConfig },
|
||||
) => ClientHandle;
|
||||
}
|
||||
|
||||
function infoSuffix(info: readonly string[] | undefined): string {
|
||||
return info && info.length > 0 ? `; ${info.join('; ')}` : '';
|
||||
}
|
||||
|
||||
export class SnowflakeAccountUsageProbeRunner implements HistoricSqlProbeRunner {
|
||||
readonly dialect = 'snowflake' as const;
|
||||
readonly catalogName = 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY';
|
||||
|
||||
private readonly reader: { probe(client: unknown): Promise<GenericProbeResult> };
|
||||
private readonly createClient: (
|
||||
input: HistoricSqlProbeInput & { connection: KtxSnowflakeConnectionConfig },
|
||||
) => ClientHandle;
|
||||
|
||||
constructor(options: SnowflakeAccountUsageProbeRunnerOptions = {}) {
|
||||
this.reader = options.reader ?? new SnowflakeHistoricSqlQueryHistoryReader();
|
||||
this.createClient =
|
||||
options.createClient ??
|
||||
((input) => {
|
||||
const client = new KtxSnowflakeHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
projectDir: input.projectDir,
|
||||
env: input.env,
|
||||
});
|
||||
return { client, cleanup: () => client.cleanup() };
|
||||
});
|
||||
}
|
||||
|
||||
async run(input: HistoricSqlProbeInput): Promise<GenericProbeResult> {
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxSnowflakeConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
const handle = this.createClient({
|
||||
...input,
|
||||
connection: input.connection,
|
||||
});
|
||||
try {
|
||||
return await this.reader.probe(handle.client);
|
||||
} finally {
|
||||
await handle.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
formatSuccessDetail(result: unknown): HistoricSqlSuccessDetail {
|
||||
const probeResult = result as GenericProbeResult;
|
||||
return {
|
||||
detail: `${this.catalogName} ready${infoSuffix(probeResult.info)}`,
|
||||
warnings: probeResult.warnings,
|
||||
};
|
||||
}
|
||||
|
||||
fixAdvice(error: unknown): HistoricSqlFixAdvice {
|
||||
if (error instanceof HistoricSqlGrantsMissingError) {
|
||||
return {
|
||||
failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
remediation: error.remediation,
|
||||
};
|
||||
}
|
||||
return {
|
||||
failHeadline: `${this.catalogName} readiness check failed`,
|
||||
remediation: error instanceof Error ? error.message : String(error),
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
@ -2068,9 +2068,40 @@ describe('setup databases step', () => {
|
|||
expect(io.stdout()).toContain('│ Changes: 0 changes across 56 tables');
|
||||
});
|
||||
|
||||
function fakeHistoricSqlRunner(
|
||||
dialect: 'postgres' | 'snowflake' | 'bigquery',
|
||||
catalogName: string,
|
||||
) {
|
||||
return {
|
||||
dialect,
|
||||
catalogName,
|
||||
async run() {
|
||||
return { warnings: [], info: [] };
|
||||
},
|
||||
formatSuccessDetail() {
|
||||
return { detail: `${catalogName} ready`, warnings: [] };
|
||||
},
|
||||
fixAdvice() {
|
||||
return {
|
||||
failHeadline: `${catalogName} unavailable`,
|
||||
remediation: 'Fix query-history grants.',
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
it('writes query history config for supported Snowflake databases after validation succeeds', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] }));
|
||||
const runner = fakeHistoricSqlRunner(
|
||||
'snowflake',
|
||||
'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
);
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: true as const,
|
||||
dialect: 'snowflake' as const,
|
||||
runner,
|
||||
result: { warnings: [], info: [] },
|
||||
}));
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
|
|
@ -2088,7 +2119,7 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
historicSqlReadinessProbe,
|
||||
prompts: makePromptAdapter({
|
||||
selectValues: ['password'],
|
||||
textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''],
|
||||
|
|
@ -2096,11 +2127,11 @@ describe('setup databases step', () => {
|
|||
}),
|
||||
},
|
||||
);
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith(
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'snowflake',
|
||||
dialect: 'snowflake',
|
||||
connection: expect.objectContaining({ driver: 'snowflake' }),
|
||||
}),
|
||||
);
|
||||
|
||||
|
|
@ -2198,7 +2229,15 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [' OK pg_stat_statements ready (PostgreSQL 16.4)'] })),
|
||||
historicSqlReadinessProbe: vi.fn(async () => {
|
||||
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
|
||||
return {
|
||||
ok: true as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
};
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -2269,7 +2308,13 @@ describe('setup databases step', () => {
|
|||
);
|
||||
const io = makeIo();
|
||||
const prompts = makePromptAdapter({ selectValues: ['yes', 'deep'] });
|
||||
const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] }));
|
||||
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: true as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
|
|
@ -2284,7 +2329,7 @@ describe('setup databases step', () => {
|
|||
prompts,
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
historicSqlReadinessProbe,
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -2303,11 +2348,13 @@ describe('setup databases step', () => {
|
|||
message: expect.stringContaining('How much database context should KTX build?'),
|
||||
}),
|
||||
);
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'warehouse',
|
||||
dialect: 'postgres',
|
||||
});
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'warehouse',
|
||||
connection: expect.objectContaining({ driver: 'postgres' }),
|
||||
}),
|
||||
);
|
||||
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
|
||||
expect(config.connections.warehouse).toMatchObject({
|
||||
context: {
|
||||
|
|
@ -2335,6 +2382,13 @@ describe('setup databases step', () => {
|
|||
'utf-8',
|
||||
);
|
||||
const io = makeIo();
|
||||
const runner = fakeHistoricSqlRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT');
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: true as const,
|
||||
dialect: 'bigquery' as const,
|
||||
runner,
|
||||
result: { warnings: [], info: [] },
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
|
|
@ -2350,10 +2404,18 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlReadinessProbe,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'analytics',
|
||||
connection: expect.objectContaining({ driver: 'bigquery' }),
|
||||
}),
|
||||
);
|
||||
const configText = await readFile(join(tempDir, 'ktx.yaml'), 'utf-8');
|
||||
const config = parseKtxProjectConfig(configText);
|
||||
expect(config.connections.analytics).toMatchObject({
|
||||
|
|
@ -2375,6 +2437,71 @@ describe('setup databases step', () => {
|
|||
expect(config.ingest.adapters).toEqual([]);
|
||||
});
|
||||
|
||||
it('prints a non-blocking BigQuery query history probe failure with the grants remediation', async () => {
|
||||
await writeFile(
|
||||
join(tempDir, 'ktx.yaml'),
|
||||
[
|
||||
'connections:',
|
||||
' analytics:',
|
||||
' driver: bigquery',
|
||||
' dataset_id: analytics',
|
||||
' credentials_json: env:BIGQUERY_CREDENTIALS_JSON',
|
||||
'',
|
||||
].join('\n'),
|
||||
'utf-8',
|
||||
);
|
||||
const io = makeIo();
|
||||
const runner = {
|
||||
...fakeHistoricSqlRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT'),
|
||||
fixAdvice: () => ({
|
||||
failHeadline: 'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
remediation:
|
||||
'Grant roles/bigquery.resourceViewer on the BigQuery project, or grant a custom role containing bigquery.jobs.listAll.',
|
||||
}),
|
||||
};
|
||||
const error = new Error('access denied');
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: false as const,
|
||||
dialect: 'bigquery' as const,
|
||||
runner,
|
||||
error,
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
projectDir: tempDir,
|
||||
inputMode: 'disabled',
|
||||
databaseConnectionIds: ['analytics'],
|
||||
databaseSchemas: [],
|
||||
enableQueryHistory: true,
|
||||
queryHistoryWindowDays: 45,
|
||||
skipDatabases: false,
|
||||
},
|
||||
io.io,
|
||||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlReadinessProbe,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'analytics',
|
||||
connection: expect.objectContaining({ driver: 'bigquery' }),
|
||||
}),
|
||||
);
|
||||
expect(io.stdout()).toContain('Query history probe...');
|
||||
expect(io.stdout()).toContain(
|
||||
'BigQuery principal cannot read INFORMATION_SCHEMA.JOBS_BY_PROJECT',
|
||||
);
|
||||
expect(io.stdout()).toContain('roles/bigquery.resourceViewer');
|
||||
expect(io.stdout()).toContain('bigquery.jobs.listAll');
|
||||
expect(io.stdout()).toContain('Setup written; query history will be skipped until fixed.');
|
||||
});
|
||||
|
||||
it('enables query history on an existing Postgres connection', async () => {
|
||||
await writeFile(
|
||||
join(tempDir, 'ktx.yaml'),
|
||||
|
|
@ -2403,7 +2530,15 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [' OK pg_stat_statements ready (PostgreSQL 16.4)'] })),
|
||||
historicSqlReadinessProbe: vi.fn(async () => {
|
||||
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
|
||||
return {
|
||||
ok: true as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
};
|
||||
}),
|
||||
},
|
||||
);
|
||||
|
||||
|
|
@ -2471,7 +2606,15 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe: vi.fn(async () => ({ ok: true, lines: [] })),
|
||||
historicSqlReadinessProbe: vi.fn(async () => {
|
||||
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
|
||||
return {
|
||||
ok: true as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
};
|
||||
}),
|
||||
},
|
||||
),
|
||||
).resolves.toMatchObject({ status: 'ready' });
|
||||
|
|
@ -2498,13 +2641,18 @@ describe('setup databases step', () => {
|
|||
|
||||
it('prints a non-blocking Postgres query history probe failure after connection test succeeds', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({
|
||||
ok: false,
|
||||
lines: [
|
||||
' FAIL pg_stat_statements extension is not installed in the connection database',
|
||||
' Fix: Run (against this database): CREATE EXTENSION pg_stat_statements;',
|
||||
" Fix: Ensure shared_preload_libraries includes 'pg_stat_statements'.",
|
||||
],
|
||||
const runner = {
|
||||
...fakeHistoricSqlRunner('postgres', 'pg_stat_statements'),
|
||||
fixAdvice: () => ({
|
||||
failHeadline: 'pg_stat_statements extension is not installed in the connection database',
|
||||
remediation: 'Run (against this database): CREATE EXTENSION pg_stat_statements;',
|
||||
}),
|
||||
};
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: false as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
error: new Error('missing extension'),
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
|
|
@ -2522,16 +2670,16 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
historicSqlReadinessProbe,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith(
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'warehouse',
|
||||
dialect: 'postgres',
|
||||
connection: expect.objectContaining({ driver: 'postgres' }),
|
||||
}),
|
||||
);
|
||||
expect(io.stdout()).toContain('Query history probe...');
|
||||
|
|
@ -2542,12 +2690,19 @@ describe('setup databases step', () => {
|
|||
|
||||
it('prints a non-blocking Snowflake query history probe failure with the grants remediation', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({
|
||||
ok: false,
|
||||
lines: [
|
||||
' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
],
|
||||
const runner = {
|
||||
...fakeHistoricSqlRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'),
|
||||
fixAdvice: () => ({
|
||||
failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
remediation:
|
||||
'Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
}),
|
||||
};
|
||||
const historicSqlReadinessProbe = vi.fn(async () => ({
|
||||
ok: false as const,
|
||||
dialect: 'snowflake' as const,
|
||||
runner,
|
||||
error: new Error('role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'),
|
||||
}));
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
|
|
@ -2564,7 +2719,7 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 0),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
historicSqlReadinessProbe,
|
||||
prompts: makePromptAdapter({
|
||||
textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''],
|
||||
passwordValues: ['env:SNOWFLAKE_PASSWORD'],
|
||||
|
|
@ -2573,11 +2728,11 @@ describe('setup databases step', () => {
|
|||
);
|
||||
|
||||
expect(result.status).toBe('ready');
|
||||
expect(historicSqlProbe).toHaveBeenCalledWith(
|
||||
expect(historicSqlReadinessProbe).toHaveBeenCalledWith(
|
||||
expect.objectContaining({
|
||||
projectDir: tempDir,
|
||||
connectionId: 'warehouse',
|
||||
dialect: 'snowflake',
|
||||
connection: expect.objectContaining({ driver: 'snowflake' }),
|
||||
}),
|
||||
);
|
||||
expect(io.stdout()).toContain('Query history probe...');
|
||||
|
|
@ -2588,7 +2743,15 @@ describe('setup databases step', () => {
|
|||
|
||||
it('does not run the query history probe when the regular connection test fails', async () => {
|
||||
const io = makeIo();
|
||||
const historicSqlProbe = vi.fn(async () => ({ ok: true, lines: [] }));
|
||||
const historicSqlReadinessProbe = vi.fn(async () => {
|
||||
const runner = fakeHistoricSqlRunner('postgres', 'pg_stat_statements');
|
||||
return {
|
||||
ok: true as const,
|
||||
dialect: 'postgres' as const,
|
||||
runner,
|
||||
result: { pgServerVersion: 'PostgreSQL 16.4', warnings: [], info: [] },
|
||||
};
|
||||
});
|
||||
|
||||
const result = await runKtxSetupDatabasesStep(
|
||||
{
|
||||
|
|
@ -2605,12 +2768,12 @@ describe('setup databases step', () => {
|
|||
{
|
||||
testConnection: vi.fn(async () => 1),
|
||||
scanConnection: vi.fn(async () => 0),
|
||||
historicSqlProbe,
|
||||
historicSqlReadinessProbe,
|
||||
},
|
||||
);
|
||||
|
||||
expect(result.status).toBe('failed');
|
||||
expect(historicSqlProbe).not.toHaveBeenCalled();
|
||||
expect(historicSqlReadinessProbe).not.toHaveBeenCalled();
|
||||
});
|
||||
|
||||
it('returns missing input when non-interactive database flags are incomplete', async () => {
|
||||
|
|
|
|||
|
|
@ -3,7 +3,13 @@ import { readFile, writeFile } from 'node:fs/promises';
|
|||
import { delimiter, dirname, join } from 'node:path';
|
||||
import { fileURLToPath } from 'node:url';
|
||||
import { promisify } from 'node:util';
|
||||
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
|
||||
import type { HistoricSqlDialect } from './context/ingest/adapters/historic-sql/types.js';
|
||||
import {
|
||||
runHistoricSqlReadinessProbe,
|
||||
type HistoricSqlProbeOutcome,
|
||||
type HistoricSqlReadinessProbe,
|
||||
} from './context/ingest/historic-sql-probes.js';
|
||||
import { type KtxProjectConnectionConfig, serializeKtxProjectConfig } from './context/project/config.js';
|
||||
import { loadKtxProject } from './context/project/project.js';
|
||||
import { markKtxSetupStateStepComplete, setKtxSetupDatabaseConnectionIds } from './context/project/setup-config.js';
|
||||
|
|
@ -84,19 +90,11 @@ export interface KtxSetupDatabasesPromptAdapter {
|
|||
cancel(message: string): void;
|
||||
}
|
||||
|
||||
interface KtxSetupHistoricSqlProbeInput {
|
||||
projectDir: string;
|
||||
connectionId: string;
|
||||
dialect: HistoricSqlDialect;
|
||||
}
|
||||
|
||||
interface KtxSetupHistoricSqlProbeResult {
|
||||
ok: boolean;
|
||||
lines: string[];
|
||||
}
|
||||
|
||||
type KtxSetupHistoricSqlProbe = (input: KtxSetupHistoricSqlProbeInput) => Promise<KtxSetupHistoricSqlProbeResult>;
|
||||
|
||||
export interface KtxSetupDatabasesDeps {
|
||||
prompts?: KtxSetupDatabasesPromptAdapter;
|
||||
testConnection?: (projectDir: string, connectionId: string, io: KtxCliIo) => Promise<number>;
|
||||
|
|
@ -105,7 +103,7 @@ export interface KtxSetupDatabasesDeps {
|
|||
listSchemas?: (projectDir: string, connectionId: string) => Promise<string[]>;
|
||||
listTables?: (projectDir: string, connectionId: string, schemas?: string[]) => Promise<KtxTableListEntry[]>;
|
||||
pickDatabaseScope?: (args: PickDatabaseScopeArgs, io: KtxCliIo) => Promise<DatabaseScopePickResult>;
|
||||
historicSqlProbe?: KtxSetupHistoricSqlProbe;
|
||||
historicSqlReadinessProbe?: HistoricSqlReadinessProbe;
|
||||
}
|
||||
|
||||
const DRIVER_OPTIONS: Array<{ value: KtxSetupDatabaseDriver; label: string }> = [
|
||||
|
|
@ -334,121 +332,24 @@ function migrateLegacyHistoricSqlConnection(connection: KtxProjectConnectionConf
|
|||
return withQueryHistoryConfig(connection, queryHistory);
|
||||
}
|
||||
|
||||
function historicSqlProbeFailureLines(error: unknown): string[] {
|
||||
if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError') {
|
||||
return [
|
||||
' FAIL pg_stat_statements extension is not installed in the connection database',
|
||||
' Fix: Run (against this database): CREATE EXTENSION pg_stat_statements;',
|
||||
" Fix: Ensure shared_preload_libraries includes 'pg_stat_statements'.",
|
||||
];
|
||||
function setupHistoricSqlProbeResult(
|
||||
outcome: HistoricSqlProbeOutcome | null,
|
||||
): KtxSetupHistoricSqlProbeResult {
|
||||
if (!outcome) {
|
||||
return { ok: true, lines: [] };
|
||||
}
|
||||
if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError') {
|
||||
const dialect = (error as { dialect?: unknown }).dialect;
|
||||
if (dialect === 'snowflake') {
|
||||
return [
|
||||
' FAIL Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
' Fix: Run (as ACCOUNTADMIN): GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE <connection role>;',
|
||||
];
|
||||
}
|
||||
return [
|
||||
' FAIL Postgres connection role lacks pg_read_all_stats',
|
||||
' Fix: Run: GRANT pg_read_all_stats TO <connection role>;',
|
||||
];
|
||||
}
|
||||
if (error instanceof Error && error.name === 'HistoricSqlVersionUnsupportedError') {
|
||||
return [` FAIL ${error.message}`];
|
||||
}
|
||||
return [` FAIL Query history probe failed: ${error instanceof Error ? error.message : String(error)}`];
|
||||
}
|
||||
|
||||
async function defaultHistoricSqlProbe(input: KtxSetupHistoricSqlProbeInput): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
if (input.dialect === 'postgres') {
|
||||
return probePostgresHistoricSql(input);
|
||||
}
|
||||
if (input.dialect === 'snowflake') {
|
||||
return probeSnowflakeHistoricSql(input);
|
||||
}
|
||||
return { ok: true, lines: [] };
|
||||
}
|
||||
|
||||
async function probePostgresHistoricSql(
|
||||
input: KtxSetupHistoricSqlProbeInput,
|
||||
): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
const project = await loadKtxProject({ projectDir: input.projectDir });
|
||||
const connection = project.config.connections[input.connectionId];
|
||||
const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] =
|
||||
await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/postgres-pgss-reader.js'),
|
||||
import('./connectors/postgres/historic-sql-query-client.js'),
|
||||
import('./connectors/postgres/connector.js'),
|
||||
]);
|
||||
|
||||
const postgresConnection = connection as Parameters<typeof isKtxPostgresConnectionConfig>[0];
|
||||
if (!isKtxPostgresConnectionConfig(postgresConnection)) {
|
||||
return {
|
||||
ok: false,
|
||||
lines: [` FAIL Connection ${input.connectionId} is not a native Postgres connection.`],
|
||||
};
|
||||
}
|
||||
|
||||
const client = new KtxPostgresHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection: postgresConnection,
|
||||
});
|
||||
try {
|
||||
const result = await new PostgresPgssReader().probe(client);
|
||||
if (outcome.ok) {
|
||||
const { detail, warnings } = outcome.runner.formatSuccessDetail(outcome.result);
|
||||
return {
|
||||
ok: true,
|
||||
lines: [
|
||||
` OK pg_stat_statements ready (${result.pgServerVersion})`,
|
||||
...result.warnings.map((warning: string) => ` ! ${warning}`),
|
||||
],
|
||||
};
|
||||
} catch (error) {
|
||||
return { ok: false, lines: historicSqlProbeFailureLines(error) };
|
||||
} finally {
|
||||
await client.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
async function probeSnowflakeHistoricSql(
|
||||
input: KtxSetupHistoricSqlProbeInput,
|
||||
): Promise<KtxSetupHistoricSqlProbeResult> {
|
||||
const project = await loadKtxProject({ projectDir: input.projectDir });
|
||||
const connection = project.config.connections[input.connectionId];
|
||||
const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] =
|
||||
await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'),
|
||||
import('./connectors/snowflake/historic-sql-query-client.js'),
|
||||
import('./connectors/snowflake/connector.js'),
|
||||
]);
|
||||
|
||||
if (!isKtxSnowflakeConnectionConfig(connection)) {
|
||||
return {
|
||||
ok: false,
|
||||
lines: [` FAIL Connection ${input.connectionId} is not a native Snowflake connection.`],
|
||||
lines: [` OK ${detail}`, ...warnings.map((warning) => ` ! ${warning}`)],
|
||||
};
|
||||
}
|
||||
|
||||
const client = new KtxSnowflakeHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection,
|
||||
projectDir: input.projectDir,
|
||||
});
|
||||
try {
|
||||
const result = await new SnowflakeHistoricSqlQueryHistoryReader().probe(client);
|
||||
return {
|
||||
ok: true,
|
||||
lines: [
|
||||
' OK SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY accessible',
|
||||
...result.warnings.map((warning: string) => ` ! ${warning}`),
|
||||
],
|
||||
};
|
||||
} catch (error) {
|
||||
return { ok: false, lines: historicSqlProbeFailureLines(error) };
|
||||
} finally {
|
||||
await client.cleanup();
|
||||
}
|
||||
const advice = outcome.runner.fixAdvice(outcome.error);
|
||||
return {
|
||||
ok: false,
|
||||
lines: [` FAIL ${advice.failHeadline}`, ` Fix: ${advice.remediation}`],
|
||||
};
|
||||
}
|
||||
|
||||
async function defaultListSchemas(projectDir: string, connectionId: string): Promise<string[]> {
|
||||
|
|
@ -1770,23 +1671,27 @@ async function maybeRunHistoricSqlSetupProbe(input: {
|
|||
const project = await loadKtxProject({ projectDir: input.projectDir });
|
||||
const connection = project.config.connections[input.connectionId];
|
||||
const queryHistory = queryHistoryConfigRecord(connection) ?? historicSqlConfigRecord(connection);
|
||||
const driver = normalizeDriver(connection?.driver);
|
||||
if (queryHistory?.enabled !== true) {
|
||||
return;
|
||||
}
|
||||
const dialect: 'postgres' | 'snowflake' | null =
|
||||
driver === 'postgres' ? 'postgres' : driver === 'snowflake' ? 'snowflake' : null;
|
||||
if (!connection) {
|
||||
return;
|
||||
}
|
||||
const dialect = queryHistoryDialectForConnection(connection);
|
||||
if (!dialect) {
|
||||
return;
|
||||
}
|
||||
|
||||
input.io.stdout.write('│ Query history probe...\n');
|
||||
const probe = input.deps.historicSqlProbe ?? defaultHistoricSqlProbe;
|
||||
const result = await probe({
|
||||
projectDir: input.projectDir,
|
||||
connectionId: input.connectionId,
|
||||
dialect,
|
||||
});
|
||||
const probe = input.deps.historicSqlReadinessProbe ?? runHistoricSqlReadinessProbe;
|
||||
const result = setupHistoricSqlProbeResult(
|
||||
await probe({
|
||||
projectDir: input.projectDir,
|
||||
connectionId: input.connectionId,
|
||||
connection,
|
||||
env: process.env,
|
||||
}),
|
||||
);
|
||||
for (const line of result.lines) {
|
||||
input.io.stdout.write(`│${line}\n`);
|
||||
}
|
||||
|
|
|
|||
|
|
@ -197,26 +197,58 @@ function withMysqlQueryHistory(config: KtxProjectConfig): KtxProjectConfig {
|
|||
};
|
||||
}
|
||||
|
||||
function fakeStatusRunner(
|
||||
dialect: 'postgres' | 'snowflake' | 'bigquery',
|
||||
catalogName: string,
|
||||
) {
|
||||
return {
|
||||
dialect,
|
||||
catalogName,
|
||||
async run() {
|
||||
return { warnings: [], info: [] };
|
||||
},
|
||||
formatSuccessDetail(result: unknown) {
|
||||
const typed = result as { warnings: string[]; info?: string[]; pgServerVersion?: string };
|
||||
const info = typed.info && typed.info.length > 0 ? `; ${typed.info.join('; ')}` : '';
|
||||
const base =
|
||||
dialect === 'postgres'
|
||||
? `pg_stat_statements ready (${typed.pgServerVersion ?? 'PostgreSQL 16.4'})`
|
||||
: `${catalogName} ready`;
|
||||
return { detail: `${base}${info}`, warnings: typed.warnings };
|
||||
},
|
||||
fixAdvice(error: unknown) {
|
||||
return {
|
||||
failHeadline: error instanceof Error ? error.message : String(error),
|
||||
remediation: 'Fix query-history grants.',
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
describe('buildProjectStatus query history dispatch', () => {
|
||||
it('runs the snowflake probe for snowflake connections, not the postgres one', async () => {
|
||||
let postgresCalls = 0;
|
||||
let snowflakeCalls = 0;
|
||||
it('runs the shared probe for snowflake connections', async () => {
|
||||
let probeCalls = 0;
|
||||
const runner = fakeStatusRunner(
|
||||
'snowflake',
|
||||
'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
);
|
||||
const project = projectWithConfig(withSnowflakeQueryHistory(baseProjectConfig()));
|
||||
|
||||
const status = await buildProjectStatus(project, {
|
||||
claudeCodeAuthProbe: stubClaudeCodeAuthProbe,
|
||||
postgresQueryHistoryProbe: async () => {
|
||||
postgresCalls += 1;
|
||||
throw new Error('postgres probe should not run for snowflake');
|
||||
},
|
||||
snowflakeQueryHistoryProbe: async () => {
|
||||
snowflakeCalls += 1;
|
||||
return { warnings: [], info: [] };
|
||||
queryHistoryReadinessProbe: async (input) => {
|
||||
probeCalls += 1;
|
||||
expect(input.connectionId).toBe('warehouse');
|
||||
return {
|
||||
ok: true,
|
||||
dialect: 'snowflake',
|
||||
runner,
|
||||
result: { warnings: [], info: [] },
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
expect(postgresCalls).toBe(0);
|
||||
expect(snowflakeCalls).toBe(1);
|
||||
expect(probeCalls).toBe(1);
|
||||
expect(status.queryHistory).toHaveLength(1);
|
||||
expect(status.queryHistory[0]).toMatchObject({
|
||||
connection: 'warehouse',
|
||||
|
|
@ -231,19 +263,21 @@ describe('buildProjectStatus query history dispatch', () => {
|
|||
|
||||
it('reports snowflake probe failures with the reader-provided remediation', async () => {
|
||||
const project = projectWithConfig(withSnowflakeQueryHistory(baseProjectConfig()));
|
||||
const { HistoricSqlGrantsMissingError } = await import(
|
||||
'./context/ingest/adapters/historic-sql/errors.js'
|
||||
);
|
||||
|
||||
const status = await buildProjectStatus(project, {
|
||||
claudeCodeAuthProbe: stubClaudeCodeAuthProbe,
|
||||
snowflakeQueryHistoryProbe: async () => {
|
||||
throw new HistoricSqlGrantsMissingError({
|
||||
dialect: 'snowflake',
|
||||
message: 'role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
remediation: 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ktx;',
|
||||
});
|
||||
},
|
||||
queryHistoryReadinessProbe: async () => ({
|
||||
ok: false,
|
||||
dialect: 'snowflake',
|
||||
runner: {
|
||||
...fakeStatusRunner('snowflake', 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'),
|
||||
fixAdvice: () => ({
|
||||
failHeadline: 'Snowflake role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY',
|
||||
remediation: 'GRANT IMPORTED PRIVILEGES ON DATABASE SNOWFLAKE TO ROLE ktx;',
|
||||
}),
|
||||
},
|
||||
error: new Error('role cannot read SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY'),
|
||||
}),
|
||||
});
|
||||
|
||||
expect(status.queryHistory[0]).toMatchObject({
|
||||
|
|
@ -257,18 +291,25 @@ describe('buildProjectStatus query history dispatch', () => {
|
|||
});
|
||||
|
||||
it('runs the bigquery probe for bigquery connections', async () => {
|
||||
let bigqueryCalls = 0;
|
||||
let probeCalls = 0;
|
||||
const runner = fakeStatusRunner('bigquery', 'INFORMATION_SCHEMA.JOBS_BY_PROJECT');
|
||||
const project = projectWithConfig(withBigQueryQueryHistory(baseProjectConfig()));
|
||||
|
||||
const status = await buildProjectStatus(project, {
|
||||
claudeCodeAuthProbe: stubClaudeCodeAuthProbe,
|
||||
bigqueryQueryHistoryProbe: async () => {
|
||||
bigqueryCalls += 1;
|
||||
return { warnings: [], info: [] };
|
||||
queryHistoryReadinessProbe: async (input) => {
|
||||
probeCalls += 1;
|
||||
expect(input.connectionId).toBe('bq');
|
||||
return {
|
||||
ok: true,
|
||||
dialect: 'bigquery',
|
||||
runner,
|
||||
result: { warnings: [], info: [] },
|
||||
};
|
||||
},
|
||||
});
|
||||
|
||||
expect(bigqueryCalls).toBe(1);
|
||||
expect(probeCalls).toBe(1);
|
||||
expect(status.queryHistory[0]).toMatchObject({
|
||||
connection: 'bq',
|
||||
driver: 'bigquery',
|
||||
|
|
@ -283,7 +324,7 @@ describe('buildProjectStatus query history dispatch', () => {
|
|||
|
||||
const status = await buildProjectStatus(project, {
|
||||
claudeCodeAuthProbe: stubClaudeCodeAuthProbe,
|
||||
postgresQueryHistoryProbe: async () => {
|
||||
queryHistoryReadinessProbe: async () => {
|
||||
throw new Error('postgres probe must not run for mysql');
|
||||
},
|
||||
});
|
||||
|
|
@ -306,7 +347,7 @@ describe('buildProjectStatus query history dispatch', () => {
|
|||
describe('buildProjectStatus --fast', () => {
|
||||
it('skips claude-code probe and Postgres query-history probe', async () => {
|
||||
let claudeProbeCalls = 0;
|
||||
let pgProbeCalls = 0;
|
||||
let queryHistoryProbeCalls = 0;
|
||||
const project = projectWithConfig(withPostgresQueryHistory(baseProjectConfig()));
|
||||
|
||||
const status = await buildProjectStatus(project, {
|
||||
|
|
@ -316,14 +357,14 @@ describe('buildProjectStatus --fast', () => {
|
|||
claudeProbeCalls += 1;
|
||||
return { ok: true };
|
||||
},
|
||||
postgresQueryHistoryProbe: async () => {
|
||||
pgProbeCalls += 1;
|
||||
queryHistoryReadinessProbe: async () => {
|
||||
queryHistoryProbeCalls += 1;
|
||||
throw new Error('should not be called');
|
||||
},
|
||||
});
|
||||
|
||||
expect(claudeProbeCalls).toBe(0);
|
||||
expect(pgProbeCalls).toBe(0);
|
||||
expect(queryHistoryProbeCalls).toBe(0);
|
||||
expect(status.llm.status).toBe('skipped');
|
||||
expect(status.llm.detail).toMatch(/--fast/);
|
||||
expect(status.queryHistory).toHaveLength(1);
|
||||
|
|
@ -340,7 +381,7 @@ describe('buildProjectStatus --fast', () => {
|
|||
env: { ANALYTICS_DATABASE_URL: 'postgres://example' },
|
||||
fast: true,
|
||||
claudeCodeAuthProbe: stubClaudeCodeAuthProbe,
|
||||
postgresQueryHistoryProbe: async () => {
|
||||
queryHistoryReadinessProbe: async () => {
|
||||
throw new Error('should not be called');
|
||||
},
|
||||
});
|
||||
|
|
|
|||
|
|
@ -1,14 +1,18 @@
|
|||
import { stat as statAsync, readdir as readdirAsync } from 'node:fs/promises';
|
||||
import { basename, join } from 'node:path';
|
||||
import { runClaudeCodeAuthProbe } from './context/llm/claude-code-runtime.js';
|
||||
import type { KtxConfigIssue, KtxProjectConfig, KtxProjectConnectionConfig, KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from './context/project/config.js';
|
||||
import type { KtxConfigIssue, KtxProjectConfig, KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from './context/project/config.js';
|
||||
import type { KtxLocalProject } from './context/project/project.js';
|
||||
import { ktxLocalStateDbPath } from './context/project/local-state-db.js';
|
||||
import type { PostgresPgssProbeResult } from './context/ingest/adapters/historic-sql/types.js';
|
||||
import {
|
||||
isQueryHistoryEnabled,
|
||||
queryHistoryDialectForConnection,
|
||||
} from './context/ingest/adapters/historic-sql/connection-dialect.js';
|
||||
import {
|
||||
historicSqlProbeCatalogName,
|
||||
runHistoricSqlReadinessProbe,
|
||||
type HistoricSqlReadinessProbe,
|
||||
} from './context/ingest/historic-sql-probes.js';
|
||||
import {
|
||||
formatClaudeCodePromptCachingFix,
|
||||
formatClaudeCodePromptCachingWarning,
|
||||
|
|
@ -178,6 +182,13 @@ function resolveRef(value: unknown, env: NodeJS.ProcessEnv): { resolved: string;
|
|||
return { resolved: trimmed, via: 'literal' };
|
||||
}
|
||||
|
||||
function failureDetail(error: unknown): string {
|
||||
if (error instanceof Error && error.message.trim().length > 0) {
|
||||
return error.message.trim().split('\n')[0] ?? error.message.trim();
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function envHint(value: unknown): string | undefined {
|
||||
if (typeof value === 'string' && value.trim().startsWith('env:')) {
|
||||
return value.trim().slice(4).trim();
|
||||
|
|
@ -401,232 +412,6 @@ function buildConnectionStatus(
|
|||
}
|
||||
}
|
||||
|
||||
interface QueryHistoryProbeInput {
|
||||
projectDir: string;
|
||||
connectionId: string;
|
||||
connection: KtxProjectConnectionConfig;
|
||||
env: NodeJS.ProcessEnv;
|
||||
}
|
||||
|
||||
interface GenericProbeResult {
|
||||
warnings: string[];
|
||||
info?: string[];
|
||||
}
|
||||
|
||||
type PostgresQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise<PostgresPgssProbeResult>;
|
||||
type SnowflakeQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise<GenericProbeResult>;
|
||||
type BigQueryQueryHistoryProbe = (input: QueryHistoryProbeInput) => Promise<GenericProbeResult>;
|
||||
|
||||
function failureDetail(error: unknown): string {
|
||||
if (error instanceof Error && error.message.trim().length > 0) {
|
||||
return error.message.trim().split('\n')[0] ?? error.message.trim();
|
||||
}
|
||||
return String(error);
|
||||
}
|
||||
|
||||
function postgresReadinessDetail(result: PostgresPgssProbeResult): string {
|
||||
const warningText = result.warnings.length > 0 ? ` with warnings: ${result.warnings.join('; ')}` : '';
|
||||
const info = result.info ?? [];
|
||||
const infoText = info.length > 0 ? `; info: ${info.join('; ')}` : '';
|
||||
return `pg_stat_statements ready (${result.pgServerVersion})${warningText}${infoText}`;
|
||||
}
|
||||
|
||||
function genericReadinessDetail(label: string, result: GenericProbeResult): string {
|
||||
const warningText = result.warnings.length > 0 ? ` with warnings: ${result.warnings.join('; ')}` : '';
|
||||
const info = result.info ?? [];
|
||||
const infoText = info.length > 0 ? `; info: ${info.join('; ')}` : '';
|
||||
return `${label} ready${warningText}${infoText}`;
|
||||
}
|
||||
|
||||
function probeFailureFix(error: unknown, dialect: string, connectionId: string, projectDir: string): string {
|
||||
if (error instanceof Error && error.name === 'HistoricSqlExtensionMissingError' && 'remediation' in error) {
|
||||
return String(error.remediation);
|
||||
}
|
||||
if (error instanceof Error && error.name === 'HistoricSqlGrantsMissingError' && 'remediation' in error) {
|
||||
return String(error.remediation);
|
||||
}
|
||||
if (error instanceof Error && error.name === 'HistoricSqlVersionUnsupportedError') {
|
||||
return 'Use PostgreSQL 14 or newer, or disable query history for this connection';
|
||||
}
|
||||
return `Fix connections.${connectionId} ${dialect} settings, then rerun \`ktx status --project-dir ${projectDir}\``;
|
||||
}
|
||||
|
||||
async function defaultPostgresQueryHistoryProbe(
|
||||
input: QueryHistoryProbeInput,
|
||||
): Promise<PostgresPgssProbeResult> {
|
||||
const [{ PostgresPgssReader }, { KtxPostgresHistoricSqlQueryClient }, { isKtxPostgresConnectionConfig }] =
|
||||
await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/postgres-pgss-reader.js'),
|
||||
import('./connectors/postgres/historic-sql-query-client.js'),
|
||||
import('./connectors/postgres/connector.js'),
|
||||
]);
|
||||
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxPostgresConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native PostgreSQL connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
|
||||
const client = new KtxPostgresHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
env: input.env,
|
||||
});
|
||||
try {
|
||||
return await new PostgresPgssReader().probe(client);
|
||||
} finally {
|
||||
await client.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
async function defaultSnowflakeQueryHistoryProbe(
|
||||
input: QueryHistoryProbeInput,
|
||||
): Promise<GenericProbeResult> {
|
||||
const [{ SnowflakeHistoricSqlQueryHistoryReader }, { KtxSnowflakeHistoricSqlQueryClient }, { isKtxSnowflakeConnectionConfig }] =
|
||||
await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/snowflake-query-history-reader.js'),
|
||||
import('./connectors/snowflake/historic-sql-query-client.js'),
|
||||
import('./connectors/snowflake/connector.js'),
|
||||
]);
|
||||
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxSnowflakeConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native Snowflake connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
|
||||
const client = new KtxSnowflakeHistoricSqlQueryClient({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
projectDir: input.projectDir,
|
||||
env: input.env,
|
||||
});
|
||||
try {
|
||||
return await new SnowflakeHistoricSqlQueryHistoryReader().probe(client);
|
||||
} finally {
|
||||
await client.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
async function defaultBigQueryQueryHistoryProbe(
|
||||
input: QueryHistoryProbeInput,
|
||||
): Promise<GenericProbeResult> {
|
||||
const [
|
||||
{ BigQueryHistoricSqlQueryHistoryReader },
|
||||
{ KtxBigQueryScanConnector, isKtxBigQueryConnectionConfig },
|
||||
{ resolveKtxConfigReference },
|
||||
] = await Promise.all([
|
||||
import('./context/ingest/adapters/historic-sql/bigquery-query-history-reader.js'),
|
||||
import('./connectors/bigquery/connector.js'),
|
||||
import('./context/core/config-reference.js'),
|
||||
]);
|
||||
|
||||
const inputDriver = input.connection.driver ?? 'unknown';
|
||||
if (!isKtxBigQueryConnectionConfig(input.connection)) {
|
||||
throw new Error(`Native BigQuery connector cannot run driver "${inputDriver}"`);
|
||||
}
|
||||
|
||||
const rawCredentials = typeof input.connection.credentials_json === 'string' ? input.connection.credentials_json : '';
|
||||
const resolvedCredentials = resolveKtxConfigReference(rawCredentials, input.env);
|
||||
if (!resolvedCredentials) {
|
||||
throw new Error(`Query history BigQuery connection ${input.connectionId} requires credentials_json`);
|
||||
}
|
||||
const parsed = JSON.parse(resolvedCredentials) as { project_id?: unknown };
|
||||
if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) {
|
||||
throw new Error(`Query history BigQuery connection ${input.connectionId} requires credentials_json.project_id`);
|
||||
}
|
||||
const region =
|
||||
typeof input.connection.location === 'string' && input.connection.location.trim().length > 0
|
||||
? input.connection.location.trim()
|
||||
: 'us';
|
||||
|
||||
const connector = new KtxBigQueryScanConnector({
|
||||
connectionId: input.connectionId,
|
||||
connection: input.connection,
|
||||
});
|
||||
try {
|
||||
return await new BigQueryHistoricSqlQueryHistoryReader({
|
||||
projectId: parsed.project_id,
|
||||
region,
|
||||
}).probe({
|
||||
async executeQuery(sql: string) {
|
||||
const result = await connector.executeReadOnly({ connectionId: input.connectionId, sql }, {} as never);
|
||||
return {
|
||||
headers: result.headers,
|
||||
rows: result.rows,
|
||||
totalRows: result.totalRows,
|
||||
};
|
||||
},
|
||||
});
|
||||
} finally {
|
||||
await connector.cleanup();
|
||||
}
|
||||
}
|
||||
|
||||
interface DispatchedProbe {
|
||||
label: string;
|
||||
spinnerLabel: string;
|
||||
fastSkipDetail: string;
|
||||
run: () => Promise<{ status: ProjectStatusLevel; detail: string; fix?: string }>;
|
||||
}
|
||||
|
||||
function postgresProbeDispatch(
|
||||
input: QueryHistoryProbeInput,
|
||||
probe: PostgresQueryHistoryProbe,
|
||||
): DispatchedProbe {
|
||||
return {
|
||||
label: 'postgres',
|
||||
spinnerLabel: `Probing pg_stat_statements on ${input.connectionId}`,
|
||||
fastSkipDetail: 'pg_stat_statements probe skipped (--fast)',
|
||||
run: async () => {
|
||||
const result = await probe(input);
|
||||
return {
|
||||
status: result.warnings.length > 0 ? 'warn' : 'ok',
|
||||
detail: postgresReadinessDetail(result),
|
||||
...(result.warnings.length > 0
|
||||
? {
|
||||
fix: `Update the Postgres parameter group or config, then rerun \`ktx status --project-dir ${input.projectDir}\``,
|
||||
}
|
||||
: {}),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function snowflakeProbeDispatch(
|
||||
input: QueryHistoryProbeInput,
|
||||
probe: SnowflakeQueryHistoryProbe,
|
||||
): DispatchedProbe {
|
||||
return {
|
||||
label: 'snowflake',
|
||||
spinnerLabel: `Probing SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY on ${input.connectionId}`,
|
||||
fastSkipDetail: 'SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY probe skipped (--fast)',
|
||||
run: async () => {
|
||||
const result = await probe(input);
|
||||
return {
|
||||
status: result.warnings.length > 0 ? 'warn' : 'ok',
|
||||
detail: genericReadinessDetail('SNOWFLAKE.ACCOUNT_USAGE.QUERY_HISTORY', result),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
function bigqueryProbeDispatch(
|
||||
input: QueryHistoryProbeInput,
|
||||
probe: BigQueryQueryHistoryProbe,
|
||||
): DispatchedProbe {
|
||||
return {
|
||||
label: 'bigquery',
|
||||
spinnerLabel: `Probing INFORMATION_SCHEMA.JOBS_BY_PROJECT on ${input.connectionId}`,
|
||||
fastSkipDetail: 'INFORMATION_SCHEMA.JOBS_BY_PROJECT probe skipped (--fast)',
|
||||
run: async () => {
|
||||
const result = await probe(input);
|
||||
return {
|
||||
status: result.warnings.length > 0 ? 'warn' : 'ok',
|
||||
detail: genericReadinessDetail('INFORMATION_SCHEMA.JOBS_BY_PROJECT', result),
|
||||
};
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
async function buildQueryHistoryStatus(
|
||||
project: KtxLocalProject,
|
||||
options: BuildProjectStatusOptions,
|
||||
|
|
@ -635,9 +420,7 @@ async function buildQueryHistoryStatus(
|
|||
.filter(([, connection]) => isQueryHistoryEnabled(connection))
|
||||
.sort(([left], [right]) => left.localeCompare(right));
|
||||
|
||||
const postgresProbe = options.postgresQueryHistoryProbe ?? defaultPostgresQueryHistoryProbe;
|
||||
const snowflakeProbe = options.snowflakeQueryHistoryProbe ?? defaultSnowflakeQueryHistoryProbe;
|
||||
const bigqueryProbe = options.bigqueryQueryHistoryProbe ?? defaultBigQueryQueryHistoryProbe;
|
||||
const probe = options.queryHistoryReadinessProbe ?? runHistoricSqlReadinessProbe;
|
||||
const env = options.env ?? process.env;
|
||||
const statuses: QueryHistoryStatus[] = [];
|
||||
|
||||
|
|
@ -657,18 +440,7 @@ async function buildQueryHistoryStatus(
|
|||
continue;
|
||||
}
|
||||
|
||||
const probeInput: QueryHistoryProbeInput = {
|
||||
projectDir: project.projectDir,
|
||||
connectionId,
|
||||
connection,
|
||||
env,
|
||||
};
|
||||
const dispatched =
|
||||
dialect === 'postgres'
|
||||
? postgresProbeDispatch(probeInput, postgresProbe)
|
||||
: dialect === 'snowflake'
|
||||
? snowflakeProbeDispatch(probeInput, snowflakeProbe)
|
||||
: bigqueryProbeDispatch(probeInput, bigqueryProbe);
|
||||
const catalogName = historicSqlProbeCatalogName(dialect);
|
||||
|
||||
if (options.fast === true) {
|
||||
statuses.push({
|
||||
|
|
@ -676,29 +448,61 @@ async function buildQueryHistoryStatus(
|
|||
driver,
|
||||
dialect,
|
||||
status: 'skipped',
|
||||
detail: dispatched.fastSkipDetail,
|
||||
detail: `${catalogName} probe skipped (--fast)`,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
try {
|
||||
const outcome = await withSpinner(options.useSpinner === true, dispatched.spinnerLabel, dispatched.run);
|
||||
const outcome = await withSpinner(
|
||||
options.useSpinner === true,
|
||||
`Probing ${catalogName} on ${connectionId}`,
|
||||
() =>
|
||||
probe({
|
||||
projectDir: project.projectDir,
|
||||
connectionId,
|
||||
connection,
|
||||
env,
|
||||
}),
|
||||
);
|
||||
|
||||
if (!outcome) {
|
||||
statuses.push({
|
||||
connection: connectionId,
|
||||
driver,
|
||||
dialect,
|
||||
...outcome,
|
||||
});
|
||||
} catch (error) {
|
||||
statuses.push({
|
||||
connection: connectionId,
|
||||
driver,
|
||||
dialect,
|
||||
dialect: driver,
|
||||
status: 'fail',
|
||||
detail: failureDetail(error),
|
||||
fix: probeFailureFix(error, dispatched.label, connectionId, project.projectDir),
|
||||
detail: `query history is not supported for driver "${driver}"`,
|
||||
fix: `Disable connections.${connectionId}.context.queryHistory, or use a postgres, snowflake, or bigquery connection`,
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
if (outcome.ok) {
|
||||
const { detail, warnings } = outcome.runner.formatSuccessDetail(outcome.result);
|
||||
statuses.push({
|
||||
connection: connectionId,
|
||||
driver,
|
||||
dialect,
|
||||
status: warnings.length > 0 ? 'warn' : 'ok',
|
||||
detail,
|
||||
...(dialect === 'postgres' && warnings.length > 0
|
||||
? {
|
||||
fix: `Update the Postgres parameter group or config, then rerun \`ktx status --project-dir ${project.projectDir}\``,
|
||||
}
|
||||
: {}),
|
||||
});
|
||||
continue;
|
||||
}
|
||||
|
||||
const advice = outcome.runner.fixAdvice(outcome.error);
|
||||
statuses.push({
|
||||
connection: connectionId,
|
||||
driver,
|
||||
dialect,
|
||||
status: 'fail',
|
||||
detail: advice.failHeadline,
|
||||
fix: advice.remediation,
|
||||
});
|
||||
}
|
||||
|
||||
return statuses;
|
||||
|
|
@ -882,9 +686,7 @@ function buildVerdict(
|
|||
|
||||
export interface BuildProjectStatusOptions {
|
||||
env?: NodeJS.ProcessEnv;
|
||||
postgresQueryHistoryProbe?: PostgresQueryHistoryProbe;
|
||||
snowflakeQueryHistoryProbe?: SnowflakeQueryHistoryProbe;
|
||||
bigqueryQueryHistoryProbe?: BigQueryQueryHistoryProbe;
|
||||
queryHistoryReadinessProbe?: HistoricSqlReadinessProbe;
|
||||
claudeCodeAuthProbe?: ClaudeCodeAuthProbe;
|
||||
configIssues?: KtxConfigIssue[];
|
||||
fast?: boolean;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue