ktx/packages/cli/src/local-adapters.ts
Andrey Avtomonov 394a985d2a
fix(snowflake): unblock multi-schema ingest and relationship discovery (#204)
* feat(setup): drop redundant Snowflake schema prompt; fall back to free-text on listSchemas failure

Snowflake setup previously asked for a single schema as free text, then
ran a multiselect against the discovered schemas — two schema questions
back-to-back, with the first being only a session bootstrap. The SDK's
`schema` is optional, so the bootstrap step is unnecessary.

- Remove the free-text Snowflake schema prompt; only pass `schema` to
  snowflake-sdk when one is configured.
- When `listSchemas()` fails (e.g. role lacks SHOW SCHEMAS), prompt the
  user for a comma-separated list, persist it as `schema_names`, and use
  it as both the table-list filter and the multiselect default. Applies
  to every driver with a scope-discovery spec, not just Snowflake.
- Update docs to lead with `schema_names`; keep `schema_name` as a
  documented single-schema shorthand.

* fix(snowflake): keep introspecting when primary-key discovery is denied

The PK query joins INFORMATION_SCHEMA.TABLE_CONSTRAINTS and
INFORMATION_SCHEMA.KEY_COLUMN_USAGE, which require grants the
connection role may not have. Previously a 'SQL compilation error:
Object ANALYTICS.INFORMATION_SCHEMA.KEY_COLUMN_USAGE does not exist
or not authorized' aborted the entire introspect — schemas, columns,
and row counts were all discarded over a missing nice-to-have.

Wrap the constraint query in try/catch, log a one-line warning per
schema, and return an empty PK map. Columns end up with
primaryKey=false; relationship inference still has FK and profiling
to fall back on.

* fix(scan): unblock relationship discovery on Snowflake

Two adjacent bugs prevented the scan's relationship pipeline from producing
any joins on a Snowflake warehouse:

- relationship-profiling.ts fell through to a default `GROUP_CONCAT` branch
  for unknown drivers. Snowflake has no GROUP_CONCAT, so every per-table
  profile query failed with "Unknown function GROUP_CONCAT". Add an explicit
  Snowflake branch that uses LISTAGG with a literal '\x1f' delimiter
  (Snowflake requires the delimiter to be a constant, so CHR(31) is rejected).
- description-generation.ts destructured `connector.sampleTable` and
  `connector.sampleColumn` into bare locals, losing the `this` binding when
  the class-method connectors (Snowflake, Postgres, MySQL) were invoked.
  Every sample call threw "Cannot read properties of undefined (reading
  'assertConnection')" and degraded LLM descriptions to metadata-only
  prompts. Call the methods through the connector instead.

Without these, even after the primary-key probe is allowed to fail softly,
the scan ends up with 0 validated relationships and an empty `joins:` block
in every shard YAML.

* test(scan): cover table-ref helpers

* feat(scan): plumb tableScope through live-database introspection port

* feat(scan): apply tableScope during metadata fetch

* feat(scan): enforce table scope at fetch boundary

* feat(scan): pool Snowflake sessions and batch enrichment for faster ingest (#206)

* feat(cli): add RSA key-pair auth option to Snowflake setup wizard

Extends the interactive Snowflake setup flow with an authentication-method
prompt (password vs RSA/JWT key-pair). The RSA branch collects a private-key
path (env/file/absolute) and an optional passphrase; the resulting connection
config records `authMethod: 'rsa'` with `privateKey` and `passphrase` instead
of `password`.

* feat(scan): pool Snowflake sessions

* fix(scan): reuse structural snapshots and cleanup connectors

* feat(scan): parallelize relationship profiling

* feat(scan): batch table description generation

* docs: document Snowflake ingest concurrency knobs

* fix(scan): close Snowflake ingest perf verification gaps

* fix(scan): keep batched description failure bounded

* feat(scan): dispatch query-history probes by connection driver

Extract historic-sql dialect resolution into a shared helper so the
status-project readiness check and the local ingest factory agree on
which connections enable query history and which probe to run. The
status command now picks the postgres/snowflake/bigquery probe based on
the connection's driver instead of always reporting against postgres,
which previously caused snowflake connections with queryHistory.enabled
to surface a misleading "driver is snowflake" failure.

Also drops a noisy console.warn from Snowflake primary-key discovery —
INFORMATION_SCHEMA.KEY_COLUMN_USAGE is commonly ungranted for read-only
roles and the FK + profiling paths handle the empty PK map already.

* fix(llm): allow StructuredOutput tool and raise maxTurns for generateObject

The Claude Code agent SDK announces an internal pseudo-tool named
StructuredOutput in the system/init message whenever outputFormat is set
to { type: 'json_schema' }. The runtime's isolation check built its
allowedToolIds set only from MCP tool ids and treated StructuredOutput
as an unexpected host-injected tool, so every generateObject call threw
"Claude Code runtime isolation failed: tools=StructuredOutput ..." and
the table-descriptions and relationship-LLM-proposal enrichment stages
recorded null output across the board.

Whitelist StructuredOutput specifically in generateObject's
allowedToolIds — the check also enforces missing_tools symmetry, so
generateText and runAgentLoop, which do not see StructuredOutput, must
not require it.

generateObject also ran with maxTurns: 1, which the model intermittently
breached when it emitted thinking text before the structured response.
Raised to 5 to give the schema-bound call enough headroom without
allowing unbounded loops. The existing tests now exercise the path with
an init message that announces StructuredOutput so the regression cannot
slip back in.

* chore(scripts): add ktx-reset.sh project-cleanup helper

Convenience script for repeatable ingest testing: takes a project
directory and prunes everything except ktx.yaml and .ktx/secrets/, so
the next ktx setup or ktx ingest run starts from a known-clean state.
2026-05-23 10:41:30 +02:00

331 lines
14 KiB
TypeScript

import { createBigQueryLiveDatabaseIntrospection } from './connectors/bigquery/live-database-introspection.js';
import { isKtxBigQueryConnectionConfig, KtxBigQueryScanConnector, type KtxBigQueryConnectionConfig } from './connectors/bigquery/connector.js';
import { createClickHouseLiveDatabaseIntrospection } from './connectors/clickhouse/live-database-introspection.js';
import { isKtxClickHouseConnectionConfig } from './connectors/clickhouse/connector.js';
import { createMysqlLiveDatabaseIntrospection } from './connectors/mysql/live-database-introspection.js';
import { isKtxMysqlConnectionConfig } from './connectors/mysql/connector.js';
import { createPostgresLiveDatabaseIntrospection } from './connectors/postgres/live-database-introspection.js';
import { isKtxPostgresConnectionConfig, type KtxPostgresConnectionConfig } from './connectors/postgres/connector.js';
import { KtxPostgresHistoricSqlQueryClient } from './connectors/postgres/historic-sql-query-client.js';
import { createSqliteLiveDatabaseIntrospection } from './connectors/sqlite/live-database-introspection.js';
import { isKtxSqliteConnectionConfig } from './connectors/sqlite/connector.js';
import { createSqlServerLiveDatabaseIntrospection } from './connectors/sqlserver/live-database-introspection.js';
import { isKtxSqlServerConnectionConfig } from './connectors/sqlserver/connector.js';
import { BigQueryHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/bigquery-query-history-reader.js';
import { queryHistoryDialectForConnection } from './context/ingest/adapters/historic-sql/connection-dialect.js';
import { createDaemonLiveDatabaseIntrospection } from './context/ingest/adapters/live-database/daemon-introspection.js';
import { createDefaultLocalIngestAdapters, type DefaultLocalIngestAdaptersOptions } from './context/ingest/local-adapters.js';
import type { HistoricSqlReader } from './context/ingest/adapters/historic-sql/types.js';
import type {
LiveDatabaseIntrospectionOptions,
LiveDatabaseIntrospectionPort,
} from './context/ingest/adapters/live-database/types.js';
import { LiveDatabaseSourceAdapter } from './context/ingest/adapters/live-database/live-database.adapter.js';
import { PostgresPgssReader } from './context/ingest/adapters/historic-sql/postgres-pgss-reader.js';
import { SnowflakeHistoricSqlQueryHistoryReader } from './context/ingest/adapters/historic-sql/snowflake-query-history-reader.js';
import type { SourceAdapter } from './context/ingest/types.js';
import type { KtxLocalProject } from './context/project/project.js';
import { createHttpSqlAnalysisPort } from './context/sql-analysis/http-sql-analysis-port.js';
import type { SqlAnalysisPort } from './context/sql-analysis/ports.js';
import {
createManagedDaemonLookerTableIdentifierParser,
createManagedDaemonSqlAnalysisPort,
managedDaemonDatabaseIntrospectionOptions,
type ManagedPythonCoreDaemonOptions,
} from './managed-python-http.js';
import type { KtxOperationalLogger } from './io/logger.js';
import { resolveKtxConfigReference } from './context/core/config-reference.js';
function hasSnowflakeDriver(connection: unknown): boolean {
return (
typeof connection === 'object' &&
connection !== null &&
String((connection as { driver?: unknown }).driver ?? '').toLowerCase() === 'snowflake'
);
}
type SnowflakeConnectorModule = typeof import('./connectors/snowflake/connector.js');
function ktxCliDaemonDatabaseIntrospectionOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['databaseIntrospection'] {
if (options.databaseIntrospectionUrl || options.databaseIntrospection?.requestJson || !options.managedDaemon) {
return options.databaseIntrospection;
}
return {
...(options.databaseIntrospection ?? {}),
...managedDaemonDatabaseIntrospectionOptions(options.managedDaemon),
};
}
function ktxCliLookerOptions(
options: KtxCliLocalIngestAdaptersOptions,
): DefaultLocalIngestAdaptersOptions['looker'] {
const looker = options.looker;
if (looker?.parser || looker?.daemonBaseUrl || process.env.KTX_DAEMON_URL || !options.managedDaemon) {
return looker;
}
return {
...(looker ?? {}),
parser: createManagedDaemonLookerTableIdentifierParser(options.managedDaemon),
};
}
function ktxCliHistoricSqlAnalysis(options: KtxCliLocalIngestAdaptersOptions) {
if (options.sqlAnalysis) {
return options.sqlAnalysis;
}
if (options.sqlAnalysisUrl) {
return createHttpSqlAnalysisPort({ baseUrl: options.sqlAnalysisUrl });
}
if (process.env.KTX_SQL_ANALYSIS_URL) {
return createHttpSqlAnalysisPort({ baseUrl: process.env.KTX_SQL_ANALYSIS_URL });
}
if (process.env.KTX_DAEMON_URL) {
return createHttpSqlAnalysisPort({ baseUrl: process.env.KTX_DAEMON_URL });
}
if (options.managedDaemon) {
return createManagedDaemonSqlAnalysisPort(options.managedDaemon);
}
return createHttpSqlAnalysisPort({ baseUrl: 'http://127.0.0.1:8765' });
}
function createKtxCliLiveDatabaseIntrospection(
project: KtxLocalProject,
options: KtxCliLocalIngestAdaptersOptions = {},
): LiveDatabaseIntrospectionPort {
const databaseIntrospection = ktxCliDaemonDatabaseIntrospectionOptions(options);
const daemon = createDaemonLiveDatabaseIntrospection({
connections: project.config.connections,
...databaseIntrospection,
...(options.databaseIntrospectionUrl ? { baseUrl: options.databaseIntrospectionUrl } : {}),
});
const sqlite = createSqliteLiveDatabaseIntrospection({
projectDir: project.projectDir,
connections: project.config.connections,
});
const mysql = createMysqlLiveDatabaseIntrospection({
connections: project.config.connections,
});
const postgres = createPostgresLiveDatabaseIntrospection({
connections: project.config.connections,
});
const clickhouse = createClickHouseLiveDatabaseIntrospection({
connections: project.config.connections,
});
const sqlserver = createSqlServerLiveDatabaseIntrospection({
connections: project.config.connections,
});
const bigquery = createBigQueryLiveDatabaseIntrospection({
connections: project.config.connections,
});
return {
async extractSchema(connectionId: string, options?: LiveDatabaseIntrospectionOptions) {
const connection = project.config.connections[connectionId];
if (isKtxPostgresConnectionConfig(connection)) {
return postgres.extractSchema(connectionId, options);
}
if (isKtxSqliteConnectionConfig(connection)) {
return sqlite.extractSchema(connectionId, options);
}
if (isKtxMysqlConnectionConfig(connection)) {
return mysql.extractSchema(connectionId, options);
}
if (isKtxClickHouseConnectionConfig(connection)) {
return clickhouse.extractSchema(connectionId, options);
}
if (isKtxSqlServerConnectionConfig(connection)) {
return sqlserver.extractSchema(connectionId, options);
}
if (isKtxBigQueryConnectionConfig(connection)) {
return bigquery.extractSchema(connectionId, options);
}
if (hasSnowflakeDriver(connection)) {
const { createSnowflakeLiveDatabaseIntrospection } = await import('./connectors/snowflake/live-database-introspection.js');
const { isKtxSnowflakeConnectionConfig } = await import('./connectors/snowflake/connector.js');;
if (!isKtxSnowflakeConnectionConfig(connection)) {
return daemon.extractSchema(connectionId, options);
}
const snowflake = createSnowflakeLiveDatabaseIntrospection({
connections: project.config.connections,
projectDir: project.projectDir,
});
return snowflake.extractSchema(connectionId, options);
}
return daemon.extractSchema(connectionId, options);
},
};
}
export interface KtxCliLocalIngestAdaptersOptions extends DefaultLocalIngestAdaptersOptions {
historicSqlConnectionId?: string;
sqlAnalysis?: SqlAnalysisPort;
sqlAnalysisUrl?: string;
managedDaemon?: ManagedPythonCoreDaemonOptions;
logger?: KtxOperationalLogger;
}
function createEphemeralPostgresHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
const connection = project.config.connections[connectionId] as KtxPostgresConnectionConfig | undefined;
const inputDriver = connection?.driver ?? 'unknown';
if (!isKtxPostgresConnectionConfig(connection)) {
throw new Error(`Query history ingest requires a Postgres connection, got ${String(inputDriver)}`);
}
return {
async executeQuery(sql: string, params?: unknown[]) {
const client = new KtxPostgresHistoricSqlQueryClient({
connectionId,
connection,
});
try {
return await client.executeQuery(sql, params);
} finally {
await client.cleanup();
}
},
};
}
function createEphemeralBigQueryHistoricSqlClient(project: KtxLocalProject, connectionId: string) {
const connection = project.config.connections[connectionId] as KtxBigQueryConnectionConfig | undefined;
const inputDriver = connection?.driver ?? 'unknown';
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(`Query history ingest requires a BigQuery connection, got ${String(inputDriver)}`);
}
return {
async executeQuery(query: string) {
const connector = new KtxBigQueryScanConnector({
connectionId,
connection,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
async function createEphemeralSnowflakeHistoricSqlClient(
project: KtxLocalProject,
connectionId: string,
connectorModule: SnowflakeConnectorModule,
) {
const connection = project.config.connections[connectionId];
const inputDriver = connection?.driver ?? 'unknown';
if (!connectorModule.isKtxSnowflakeConnectionConfig(connection)) {
throw new Error(`Query history ingest requires a Snowflake connection, got ${String(inputDriver)}`);
}
return {
async executeQuery(query: string) {
const connector = new connectorModule.KtxSnowflakeScanConnector({
connectionId,
connection,
projectDir: project.projectDir,
});
try {
const result = await connector.executeReadOnly({ connectionId, sql: query }, {} as never);
return {
headers: result.headers,
rows: result.rows,
totalRows: result.totalRows,
};
} finally {
await connector.cleanup();
}
},
};
}
function bigQueryProjectId(connection: KtxBigQueryConnectionConfig, env: NodeJS.ProcessEnv): string {
const raw = typeof connection.credentials_json === 'string' ? connection.credentials_json : '';
const resolved = resolveKtxConfigReference(raw, env);
if (!resolved) {
throw new Error('Query history BigQuery connection requires credentials_json');
}
const parsed = JSON.parse(resolved) as { project_id?: unknown };
if (typeof parsed.project_id !== 'string' || parsed.project_id.trim().length === 0) {
throw new Error('Query history BigQuery connection requires credentials_json.project_id');
}
return parsed.project_id;
}
function bigQueryRegion(connection: KtxBigQueryConnectionConfig): string {
return typeof connection.location === 'string' && connection.location.trim().length > 0
? connection.location.trim()
: 'us';
}
function historicSqlOptionsForLocalRun(project: KtxLocalProject, options: KtxCliLocalIngestAdaptersOptions) {
const connectionId = options.historicSqlConnectionId;
if (!connectionId) {
return undefined;
}
const connection = project.config.connections[connectionId];
const dialect = queryHistoryDialectForConnection(connection);
if (!dialect) {
return undefined;
}
const base = {
sqlAnalysis: ktxCliHistoricSqlAnalysis(options),
};
if (dialect === 'postgres') {
return {
...base,
reader: new PostgresPgssReader() satisfies HistoricSqlReader,
queryClient: createEphemeralPostgresHistoricSqlClient(project, connectionId),
};
}
if (dialect === 'bigquery') {
const inputDriver = connection?.driver ?? 'unknown';
if (!isKtxBigQueryConnectionConfig(connection)) {
throw new Error(`Query history ingest requires a BigQuery connection, got ${String(inputDriver)}`);
}
return {
...base,
reader: new BigQueryHistoricSqlQueryHistoryReader({
projectId: bigQueryProjectId(connection, process.env),
region: bigQueryRegion(connection),
}) satisfies HistoricSqlReader,
queryClient: createEphemeralBigQueryHistoricSqlClient(project, connectionId),
};
}
return {
...base,
reader: new SnowflakeHistoricSqlQueryHistoryReader() satisfies HistoricSqlReader,
queryClient: {
async executeQuery(query: string) {
const connectorModule = await import('./connectors/snowflake/connector.js');
const client = await createEphemeralSnowflakeHistoricSqlClient(project, connectionId, connectorModule);
return client.executeQuery(query);
},
},
};
}
export function createKtxCliLocalIngestAdapters(
project: KtxLocalProject,
options: KtxCliLocalIngestAdaptersOptions = {},
): SourceAdapter[] {
const historicSql = historicSqlOptionsForLocalRun(project, options);
const base = createDefaultLocalIngestAdapters(project, {
...options,
databaseIntrospection: ktxCliDaemonDatabaseIntrospectionOptions(options),
looker: ktxCliLookerOptions(options),
...(historicSql ? { historicSql } : {}),
});
const liveDatabase = new LiveDatabaseSourceAdapter({
introspection: createKtxCliLiveDatabaseIntrospection(project, options),
});
return base.map((adapter) => (adapter.source === 'live-database' ? liveDatabase : adapter));
}