feat(scan): pool Snowflake sessions and batch enrichment for faster ingest (#206)

* feat(cli): add RSA key-pair auth option to Snowflake setup wizard

Extends the interactive Snowflake setup flow with an authentication-method
prompt (password vs RSA/JWT key-pair). The RSA branch collects a private-key
path (env/file/absolute) and an optional passphrase; the resulting connection
config records `authMethod: 'rsa'` with `privateKey` and `passphrase` instead
of `password`.

* feat(scan): pool Snowflake sessions

* fix(scan): reuse structural snapshots and cleanup connectors

* feat(scan): parallelize relationship profiling

* feat(scan): batch table description generation

* docs: document Snowflake ingest concurrency knobs

* fix(scan): close Snowflake ingest perf verification gaps

* fix(scan): keep batched description failure bounded
This commit is contained in:
Andrey Avtomonov 2026-05-23 02:39:45 +02:00 committed by GitHub
parent 6e31687782
commit 4e654c43c1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 1247 additions and 237 deletions

View file

@ -157,6 +157,12 @@ connections:
dataset_ids: [analytics, mart]
```
For Snowflake connections, set `maxSessions` when deep ingest needs more or
fewer concurrent warehouse sessions. The default is `4`. This caps all
concurrent Snowflake SQL work for that connector instance, including schema
introspection, table sampling, relationship profiling, relationship
validation, and read-only SQL execution.
For Postgres, BigQuery, and Snowflake, `historicSql` and `context.queryHistory`
toggle query-history ingest. The shape is connector-specific; the setup wizard
writes these fields when you pass `--enable-query-history`.
@ -483,6 +489,7 @@ scan:
maxLlmTablesPerBatch: 40
maxCandidatesPerColumn: 25
profileSampleRows: 10000
profileConcurrency: 4
validationConcurrency: 4
validationBudget: all
```
@ -510,6 +517,7 @@ the manifest.
| `relationships.maxLlmTablesPerBatch` | `int > 0` | `40` | Max tables included in a single LLM relationship-proposal batch. |
| `relationships.maxCandidatesPerColumn` | `int > 0` | `25` | Max join partners considered per column. |
| `relationships.profileSampleRows` | `int > 0` | `10000` | Rows sampled per table when profiling values for relationship inference. |
| `relationships.profileConcurrency` | `int > 0` | `4` | Parallel relationship-profile queries against the database. For Snowflake, effective database concurrency is also bounded by the connection's `maxSessions`. |
| `relationships.validationConcurrency` | `int > 0` | `4` | Parallel relationship validation queries against the database. |
| `relationships.validationBudget` | `all` \| `int ≥ 0` | runtime default | Cap on validation queries per scan. `all` means unlimited. |

View file

@ -1,4 +1,12 @@
import { describe, expect, it, vi } from 'vitest';
const createPool = vi.hoisted(() => vi.fn());
vi.mock('snowflake-sdk', () => ({
default: { createPool },
createPool,
}));
import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js';
import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js';
import { tableRefSet } from '../../context/scan/table-ref.js';
@ -64,6 +72,38 @@ function fakeDriverFactory(): KtxSnowflakeDriverFactory {
return { createDriver: vi.fn(() => driver) };
}
function fakeSnowflakeStatement(headers: string[] = ['ONE']) {
return {
getColumns: () => headers.map((header) => ({ getName: () => header, getType: () => 'TEXT' })),
};
}
function installSnowflakePoolMock() {
const executedSql: string[] = [];
const connection = {
execute: vi.fn(
(input: {
sqlText: string;
complete: (
error: Error | null,
statement: ReturnType<typeof fakeSnowflakeStatement>,
rows: Array<Record<string, unknown>>,
) => void;
}) => {
executedSql.push(input.sqlText);
input.complete(null, fakeSnowflakeStatement(), [{ ONE: 1 }]);
},
),
};
const pool = {
use: vi.fn(async (fn: (conn: typeof connection) => Promise<unknown>) => fn(connection)),
drain: vi.fn(async () => undefined),
clear: vi.fn(async () => undefined),
};
createPool.mockReturnValue(pool);
return { connection, pool, executedSql };
}
describe('KtxSnowflakeScanConnector', () => {
it('resolves Snowflake connection configuration safely', () => {
expect(
@ -100,6 +140,99 @@ describe('KtxSnowflakeScanConnector', () => {
});
});
it('defaults and validates Snowflake maxSessions', () => {
const baseConnection = {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
} as const;
expect(
snowflakeConnectionConfigFromConfig({
connectionId: 'warehouse',
connection: baseConnection,
}),
).toMatchObject({ maxSessions: 4 });
expect(
snowflakeConnectionConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxSessions: 8 },
}),
).toMatchObject({ maxSessions: 8 });
for (const maxSessions of [0, -1, 1.5, Number.NaN]) {
expect(() =>
snowflakeConnectionConfigFromConfig({
connectionId: 'warehouse',
connection: { ...baseConnection, maxSessions },
}),
).toThrow('connections.warehouse.maxSessions must be a positive integer');
}
});
it('uses one lazy Snowflake pool and drains it during cleanup', async () => {
const { pool, executedSql } = installSnowflakePoolMock();
const close = vi.fn(async () => undefined);
const connector = new KtxSnowflakeScanConnector({
connectionId: 'warehouse',
connection: {
driver: 'snowflake',
authMethod: 'password',
account: 'acct',
warehouse: 'WH',
database: 'ANALYTICS',
schema_name: 'PUBLIC',
username: 'reader',
password: 'fixture-pass', // pragma: allowlist secret
role: 'ANALYST',
maxSessions: 3,
},
sdkOptionsProvider: {
resolve: vi.fn(async () => ({ sdkOptions: { application: 'ktx-test' }, close })),
},
});
expect(createPool).not.toHaveBeenCalled();
await connector.executeReadOnly({ connectionId: 'warehouse', sql: 'select 1', maxRows: 1 }, { runId: 'run-1' });
await connector.executeReadOnly({ connectionId: 'warehouse', sql: 'select 1', maxRows: 1 }, { runId: 'run-1' });
expect(createPool).toHaveBeenCalledTimes(1);
expect(createPool).toHaveBeenCalledWith(
expect.objectContaining({
account: 'acct',
username: 'reader',
warehouse: 'WH',
database: 'ANALYTICS',
schema: 'PUBLIC',
role: 'ANALYST',
password: 'fixture-pass', // pragma: allowlist secret
clientSessionKeepAlive: true,
clientSessionKeepAliveHeartbeatFrequency: 900,
application: 'ktx-test',
}),
expect.objectContaining({
min: 0,
max: 3,
evictionRunIntervalMillis: 30_000,
acquireTimeoutMillis: 60_000,
}),
);
expect(pool.use).toHaveBeenCalledTimes(2);
expect(executedSql.some((sql) => /^USE\s+/i.test(sql.trim()))).toBe(false);
await connector.cleanup();
expect(pool.drain).toHaveBeenCalledBefore(pool.clear);
expect(pool.clear).toHaveBeenCalledTimes(1);
expect(close).toHaveBeenCalledTimes(1);
});
it('introspects schema, primary keys, comments, row counts, and dimensions', async () => {
const connector = new KtxSnowflakeScanConnector({
connectionId: 'warehouse',

View file

@ -24,6 +24,7 @@ export interface KtxSnowflakeConnectionConfig {
privateKey?: string;
passphrase?: string;
role?: string;
maxSessions?: number;
[key: string]: unknown;
}
@ -38,6 +39,7 @@ export interface KtxSnowflakeResolvedConnectionConfig {
privateKey?: string;
passphrase?: string;
role?: string;
maxSessions: number;
}
export interface KtxSnowflakeRawColumnMetadata {
@ -132,6 +134,23 @@ function stringConfigValue(
return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined;
}
function positiveIntegerConfigValue(input: {
connection: KtxSnowflakeConnectionConfig;
key: keyof KtxSnowflakeConnectionConfig;
connectionId: string;
defaultValue: number;
}): number {
const value = input.connection[input.key];
if (value === undefined) {
return input.defaultValue;
}
const numberValue = Number(value);
if (!Number.isInteger(numberValue) || numberValue < 1) {
throw new Error(`connections.${input.connectionId}.${String(input.key)} must be a positive integer`);
}
return numberValue;
}
function schemaNames(connection: KtxSnowflakeConnectionConfig, env: NodeJS.ProcessEnv): string[] {
if (Array.isArray(connection.schema_names) && connection.schema_names.length > 0) {
return connection.schema_names
@ -230,6 +249,12 @@ export function snowflakeConnectionConfigFromConfig(input: {
database,
schemas: resolvedSchemas,
username,
maxSessions: positiveIntegerConfigValue({
connection: input.connection,
key: 'maxSessions',
connectionId: input.connectionId,
defaultValue: 4,
}),
};
const role = stringConfigValue(input.connection, 'role', env);
if (role) {
@ -265,6 +290,7 @@ class DefaultSnowflakeDriverFactory implements KtxSnowflakeDriverFactory {
class SnowflakeSdkDriver implements KtxSnowflakeDriver {
private closeSdkOptions: Array<() => Promise<void>> = [];
private pool: ReturnType<typeof snowflake.createPool> | null = null;
constructor(
private readonly resolved: KtxSnowflakeResolvedConnectionConfig,
@ -285,16 +311,21 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
}
async query(sql: string, params?: unknown): Promise<KtxQueryResult> {
let connection: Connection | null = null;
const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined;
try {
connection = await this.createConnection();
const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined;
const result = await this.executeSnowflakeQuery(connection, sql, binds);
const pool = await this.getPool();
const result = await pool.use(async (connection: snowflake.Connection) =>
this.executeSnowflakeQuery(connection, sql, binds),
);
return { ...result, totalRows: result.rows.length, rowCount: result.rows.length };
} finally {
if (connection) {
await this.destroyConnection(connection);
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
if (/timeout/i.test(message) && /pool|acquire/i.test(message)) {
throw new Error(
"Snowflake session pool exhausted after 60s - consider lowering maxSessions or increasing your account's concurrent-statement limit.",
);
}
throw error;
}
}
@ -375,27 +406,41 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
}
async cleanup(): Promise<void> {
const pool = this.pool;
this.pool = null;
if (pool) {
// Drain before clear so in-flight Snowflake statements finish before idle
// sessions are closed.
await pool.drain();
await pool.clear();
}
const closers = this.closeSdkOptions;
this.closeSdkOptions = [];
await Promise.all(closers.map((close) => close()));
await Promise.all(closers.map((close) => Promise.resolve(close())));
}
private async runTest(): Promise<{ success: boolean; error?: string }> {
let connection: Connection | null = null;
try {
connection = await this.createConnection();
await this.executeSnowflakeQuery(connection, 'SELECT 1');
await this.query('SELECT 1');
return { success: true };
} catch (error) {
return { success: false, error: error instanceof Error ? error.message : String(error) };
} finally {
if (connection) {
await this.destroyConnection(connection);
}
}
}
private async createConnection(): Promise<Connection> {
private async getPool(): Promise<ReturnType<typeof snowflake.createPool>> {
if (!this.pool) {
this.pool = snowflake.createPool(await this.resolveConnectionOptions(), {
min: 0,
max: this.resolved.maxSessions,
evictionRunIntervalMillis: 30_000,
acquireTimeoutMillis: 60_000,
});
}
return this.pool;
}
private async resolveConnectionOptions(): Promise<snowflake.ConnectionOptions> {
const patch = await this.sdkOptionsProvider?.resolve({
account: this.resolved.account,
connection: { ...this.resolved, driver: 'snowflake' },
@ -411,47 +456,13 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
database: this.resolved.database,
...(sessionSchema ? { schema: sessionSchema } : {}),
role: this.resolved.role,
clientSessionKeepAlive: true,
clientSessionKeepAliveHeartbeatFrequency: 900,
...patch?.sdkOptions,
};
const connectionConfig: ConnectionOptions =
this.resolved.authMethod === 'rsa'
? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() }
: { ...baseConfig, password: this.resolved.password };
const connection = snowflake.createConnection(connectionConfig);
return new Promise((resolveConnection, rejectConnection) => {
connection.connect((error, connected) => {
if (error) {
rejectConnection(error);
return;
}
const resolvedConnection = connected ?? connection;
this.setConnectionContext(resolvedConnection).then(
() => resolveConnection(resolvedConnection),
(contextError) => {
resolvedConnection.destroy(() => undefined);
rejectConnection(contextError);
},
);
});
});
}
private async setConnectionContext(connection: Connection): Promise<void> {
if (this.resolved.role) {
await this.executeSnowflakeQuery(connection, `USE ROLE ${quoteSnowflakeIdentifier(this.resolved.role, 'role')}`);
}
await this.executeSnowflakeQuery(
connection,
`USE WAREHOUSE ${quoteSnowflakeIdentifier(this.resolved.warehouse, 'warehouse')}`,
);
await this.executeSnowflakeQuery(
connection,
`USE DATABASE ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}`,
);
await this.executeSnowflakeQuery(
connection,
`USE SCHEMA ${quoteSnowflakeIdentifier(this.resolved.schemas[0] ?? 'PUBLIC', 'schema')}`,
);
return this.resolved.authMethod === 'rsa'
? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() }
: { ...baseConfig, password: this.resolved.password };
}
private async executeSnowflakeQuery(
@ -480,18 +491,6 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver {
});
}
private destroyConnection(connection: Connection): Promise<void> {
return new Promise((resolveDestroy, rejectDestroy) => {
connection.destroy((error) => {
if (error) {
rejectDestroy(error);
return;
}
resolveDestroy();
});
});
}
private decryptPrivateKey(): string {
if (!this.resolved.privateKey) {
throw new Error('Private key is required for RSA authentication');

View file

@ -74,6 +74,7 @@ connections:
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
},
@ -278,6 +279,7 @@ scan:
maxLlmTablesPerBatch: 12
maxCandidatesPerColumn: 7
profileSampleRows: 500
profileConcurrency: 3
validationConcurrency: 2
validationBudget: 0
`);
@ -291,6 +293,7 @@ scan:
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
validationBudget: 0,
});
@ -302,6 +305,7 @@ scan:
expect(serializeKtxProjectConfig(config)).toContain('maxLlmTablesPerBatch: 12');
expect(serializeKtxProjectConfig(config)).toContain('maxCandidatesPerColumn: 7');
expect(serializeKtxProjectConfig(config)).toContain('profileSampleRows: 500');
expect(serializeKtxProjectConfig(config)).toContain('profileConcurrency: 3');
expect(serializeKtxProjectConfig(config)).toContain('validationConcurrency: 2');
expect(serializeKtxProjectConfig(config)).toContain('validationBudget: 0');
});
@ -326,6 +330,7 @@ scan:
maxLlmTablesPerBatch: 0
maxCandidatesPerColumn: -4
profileSampleRows: 0
profileConcurrency: 0
validationConcurrency: 0
validationBudget: 1.5
`;
@ -341,6 +346,7 @@ scan:
'scan.relationships.maxLlmTablesPerBatch',
'scan.relationships.maxCandidatesPerColumn',
'scan.relationships.profileSampleRows',
'scan.relationships.profileConcurrency',
'scan.relationships.validationConcurrency',
'scan.relationships.validationBudget',
]),

View file

@ -163,6 +163,11 @@ const scanRelationshipsSchema = z
.default(25)
.describe('Maximum number of candidate join partners considered per column during relationship discovery.'),
profileSampleRows: z.int().positive().default(10000).describe('Number of rows sampled per table when profiling values for relationship inference.'),
profileConcurrency: z
.int()
.positive()
.default(4)
.describe('Parallel relationship-profile queries run against the database during scan.'),
validationConcurrency: z.int().positive().default(4).describe('Number of relationship validation queries run in parallel against the database.'),
validationBudget: z
.union([z.literal('all'), z.int().nonnegative()])

View file

@ -378,6 +378,121 @@ describe('KtxDescriptionGenerator', () => {
expect(cache.set).toHaveBeenCalledWith('warehouse.public.orders', 'Commerce orders');
expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders');
});
it('generates one structured table description and reuses table samples for all columns', async () => {
const llmRuntime = createLlmProvider('unused');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [
{ name: 'status', description: 'Current order state' },
{ name: 'amount', description: 'Order amount in dollars' },
],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
rawDescriptions: { db: 'Orders fact table' },
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(result.tableDescription).toBe('Commerce orders');
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Current order state',
amount: 'Order amount in dollars',
});
expect(connector.sampleTable).toHaveBeenCalledTimes(1);
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).not.toHaveBeenCalled();
});
it('falls back to one column generateText call for each missing structured column', async () => {
const llmRuntime = createLlmProvider('Fallback status');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [{ name: 'amount', description: 'Order amount in dollars' }],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Fallback status',
amount: 'Order amount in dollars',
});
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).toHaveBeenCalledTimes(1);
});
it('does not run per-column fallback when structured object generation throws', async () => {
const llmRuntime = createLlmProvider('Fallback description');
llmRuntime.generateObject = vi.fn(async () => {
throw new Error('object output unavailable');
});
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmRuntime,
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector: createConnector(),
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'status', type: 'text' }],
},
});
expect(result.tableDescription).toBeNull();
expect(Object.fromEntries(result.columnDescriptions)).toEqual({ status: null });
expect(warnings).toContain('enrichment_failed');
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).not.toHaveBeenCalled();
});
});
describe('KtxDescriptionGenerator resilience', () => {

View file

@ -1,4 +1,5 @@
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { z } from 'zod';
import type {
KtxColumnSampleInput,
KtxColumnSampleResult,
@ -53,7 +54,7 @@ export interface KtxDescriptionColumn {
sampleValues?: unknown[];
}
export interface KtxDescriptionColumnTable extends KtxTableRef {
interface KtxDescriptionColumnTable extends KtxTableRef {
columns: KtxDescriptionColumn[];
}
@ -112,6 +113,23 @@ export interface KtxGenerateTableDescriptionInput {
table: KtxDescriptionTableInput;
}
export interface KtxGenerateBatchedTableDescriptionsInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
context: KtxScanContext;
dataSourceType: string;
supportsNestedAnalysis: boolean;
table: KtxDescriptionColumnTable & {
rawDescriptions?: Record<string, string>;
columns: Array<KtxDescriptionColumn & { type?: string; comment?: string | null }>;
};
}
export interface KtxBatchedTableDescriptionsResult {
tableDescription: string | null;
columnDescriptions: Map<string, string | null>;
}
export interface KtxGenerateDataSourceDescriptionInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
@ -136,6 +154,18 @@ interface ColumnTaskResult {
skipped: boolean;
}
const batchedTableDescriptionSchema = z.object({
tableDescription: z.string(),
columns: z.array(
z.object({
name: z.string(),
description: z.string(),
}),
),
});
type BatchedTableDescriptionOutput = z.infer<typeof batchedTableDescriptionSchema>;
function descriptionSources(rawDescriptions: Record<string, string> | undefined): Array<[string, string]> {
if (!rawDescriptions) {
return [];
@ -250,6 +280,76 @@ function wordLimitLine(maxWords: number): string {
return `Please provide a concise description in ${maxWords} words or less.`;
}
function sampleValuesByColumn(
columns: readonly KtxDescriptionColumn[],
sampleData: KtxTableSampleResult | null,
): Map<string, unknown[]> {
const values = new Map<string, unknown[]>();
for (const column of columns) {
const existingValues = column.sampleValues?.filter((value) => value !== null && value !== undefined) ?? [];
if (existingValues.length > 0) {
values.set(column.name, existingValues);
}
}
if (!sampleData) {
return values;
}
for (const column of columns) {
const index = sampleData.headers.findIndex((header) => header.toLowerCase() === column.name.toLowerCase());
if (index < 0) {
continue;
}
const sampledValues = sampleData.rows
.map((row) => row[index])
.filter((value) => value !== null && value !== undefined);
if (sampledValues.length > 0) {
values.set(column.name, sampledValues);
}
}
return values;
}
function batchedPrompt(input: {
table: KtxGenerateBatchedTableDescriptionsInput['table'];
sampleData: KtxTableSampleResult | null;
dataSourceType: string;
tableMaxWords: number;
columnMaxWords: number;
}): KtxDescriptionPrompt {
const columnLines = input.table.columns
.map((column) => {
const typePart = column.type ? ` (${column.type})` : '';
const commentPart = column.rawDescriptions?.db ? ` - ${column.rawDescriptions.db}` : '';
return `- ${column.name}${typePart}${commentPart}`;
})
.join('\n');
const sampleLines =
input.sampleData && input.sampleData.rows.length > 0
? input.sampleData.rows
.slice(0, 5)
.map((row) =>
input.sampleData!.headers.map((header, index) => `${header}=${String(row[index] ?? '')}`).join(', '),
)
.join('\n')
: 'unavailable';
return {
system: [
'Analyze one database table and return structured JSON matching the supplied schema.',
`The table description must be ${input.tableMaxWords} words or less.`,
`Each column description must be ${input.columnMaxWords} words or less.`,
'Describe business meaning directly. Do not repeat table or column names as filler.',
].join('\n'),
user: [
`Table: ${input.table.name}`,
`Data source type: ${input.dataSourceType}`,
'Columns:',
columnLines,
'Sample rows:',
sampleLines,
].join('\n'),
};
}
/** @internal */
export function buildKtxColumnDescriptionPrompt(
input: KtxColumnDescriptionPromptInput & { maxWords?: number },
@ -582,6 +682,156 @@ export class KtxDescriptionGenerator {
}
}
async generateBatchedTableDescriptions(
input: KtxGenerateBatchedTableDescriptionsInput,
): Promise<KtxBatchedTableDescriptionsResult> {
const tableRef = toTableRef(input.table);
let sampleData: KtxTableSampleResult | null = null;
let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null;
if (!input.connector.sampleTable) {
fallbackReason = 'capability_missing';
this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'connector_capability_missing',
message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, capability: 'sampleTable' },
});
} else {
try {
sampleData = await retryAsync(
() =>
input.connector.sampleTable!(
{
connectionId: input.connectionId,
table: tableRef,
limit: 20,
},
input.context,
),
{
attempts: 3,
baseDelayMs: 200,
signal: input.context.signal,
onAttemptFailure: (error, attempt) => {
this.logger?.warn(`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
attempt,
});
},
},
);
if (sampleData.rows.length === 0) {
fallbackReason = 'empty_sample';
this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
}
} catch (error) {
if (error instanceof KtxAbortedError) {
throw error;
}
fallbackReason = 'sampling_failed';
this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'sampling_failed',
message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, error: errorMessage(error) },
});
}
}
const sampleValues = sampleValuesByColumn(input.table.columns, sampleData);
const descriptions = new Map<string, string | null>();
let tableDescription: string | null = null;
let structuredGenerationSucceeded = false;
try {
const prompt = batchedPrompt({
table: input.table,
sampleData,
dataSourceType: input.dataSourceType,
tableMaxWords: this.settings.tableMaxWords,
columnMaxWords: this.settings.columnMaxWords,
});
const generated = await this.llmRuntime.generateObject<
BatchedTableDescriptionOutput,
typeof batchedTableDescriptionSchema
>({
role: 'candidateExtraction',
system: prompt.system,
prompt: prompt.user,
schema: batchedTableDescriptionSchema,
temperature: this.settings.temperature,
});
structuredGenerationSucceeded = true;
tableDescription = generated.tableDescription.trim() || null;
const generatedColumns = new Map(
generated.columns.map((column) => [column.name.toLowerCase(), column.description.trim() || null]),
);
for (const column of input.table.columns) {
const description = generatedColumns.get(column.name.toLowerCase()) ?? null;
descriptions.set(column.name, description);
}
if (tableDescription && fallbackReason !== null) {
this.onWarning?.({
code: 'description_fallback_used',
message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, reason: fallbackReason },
});
}
} catch (error) {
this.logger?.warn(`Batched table description failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'enrichment_failed',
message: `Failed to generate batched description for table ${input.table.name}: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id },
});
}
if (!structuredGenerationSucceeded) {
for (const column of input.table.columns) {
descriptions.set(column.name, null);
}
return { tableDescription, columnDescriptions: descriptions };
}
const tableContext = `Table: ${input.table.name} | Columns: ${input.table.columns.map((column) => column.name).join(', ')} | Data source: ${input.dataSourceType}`;
for (const column of input.table.columns) {
if (descriptions.get(column.name)) {
continue;
}
const fallback = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: sampleValues.get(column.name) ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
});
descriptions.set(column.name, fallback);
}
return { tableDescription, columnDescriptions: descriptions };
}
async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise<string | null> {
if (input.tables.length === 0) {
return 'No tables found in database';
@ -732,27 +982,13 @@ export class KtxDescriptionGenerator {
}
}
const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return {
columnName: column.name,
description: null,
skipped: false,
processed: false,
};
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: column.name,
columnValues: nonNullValues,
const description = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: columnValues ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
const description = await this.generateAiDescription(prompt, 'ktx-column-description');
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
@ -782,6 +1018,30 @@ export class KtxDescriptionGenerator {
}
}
private async generateColumnDescriptionFromPreparedValues(input: {
column: KtxDescriptionColumn;
columnValues: unknown[];
tableContext: string;
dataSourceType: string;
supportsNestedAnalysis: boolean;
}): Promise<string | null> {
const nonNullValues = input.columnValues.filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(input.column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return null;
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: input.column.name,
columnValues: nonNullValues,
tableContext: input.tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: input.column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
return this.generateAiDescription(prompt, 'ktx-column-description');
}
private async generateAiDescription(prompt: KtxDescriptionPrompt, _operationName: string): Promise<string | null> {
try {
const text = await this.llmRuntime.generateText({

View file

@ -289,6 +289,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
});
@ -378,6 +379,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
profileWarnings: [],
@ -472,6 +474,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,
@ -741,6 +744,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,

View file

@ -382,6 +382,7 @@ export async function writeLocalScanEnrichmentArtifacts(
validationRequiredForManifest: input.relationshipSettings.validationRequiredForManifest,
maxCandidatesPerColumn: input.relationshipSettings.maxCandidatesPerColumn,
profileSampleRows: input.relationshipSettings.profileSampleRows,
profileConcurrency: input.relationshipSettings.profileConcurrency,
validationConcurrency: input.relationshipSettings.validationConcurrency,
}
: undefined,

View file

@ -299,6 +299,38 @@ describe('local scan enrichment', () => {
]);
});
it('uses the supplied snapshot without calling connector.introspect', async () => {
const scanConnector = connector();
const introspect = vi.mocked(scanConnector.introspect);
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
snapshot,
context: { runId: 'scan-run-snapshot' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(introspect).not.toHaveBeenCalled();
});
it('falls back to connector.introspect when no snapshot is supplied', async () => {
const scanConnector = connector();
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'structural',
connector: scanConnector,
context: { runId: 'scan-run-introspect' },
providers: null,
});
expect(result.snapshot).toEqual(snapshot);
expect(scanConnector.introspect).toHaveBeenCalledTimes(1);
});
it('runs deterministic relationship detection for relationship scans', async () => {
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -473,7 +505,7 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
it('generates batched table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
@ -497,27 +529,27 @@ describe('local scan enrichment', () => {
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
let activeTableSamples = 0;
let maxActiveTableSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
sampleColumn: vi.fn(async () => ({
values: ['1'],
nullCount: 0,
distinctCount: 1,
})),
sampleTable: vi.fn(async () => {
activeTableSamples += 1;
maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
activeTableSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
headers: ['id'],
rows: [[1]],
totalRows: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig().scan.relationships,
@ -533,7 +565,8 @@ describe('local scan enrichment', () => {
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
expect(maxActiveTableSamples).toBe(4);
expect(scanConnector.sampleColumn).not.toHaveBeenCalled();
});
it('reports enrichment progress for countable stages', async () => {
@ -675,7 +708,7 @@ describe('local scan enrichment', () => {
providerIdentity: { provider: 'fake', embeddingDimensions: 6 },
});
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const embedBatch = vi.spyOn(providers.embedding, 'embedBatch');
const second = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -693,7 +726,7 @@ describe('local scan enrichment', () => {
expect(first.state.resumedStages).toEqual([]);
expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).not.toHaveBeenCalled();
expect(generateObject).not.toHaveBeenCalled();
expect(embedBatch).not.toHaveBeenCalled();
expect(second.descriptionUpdates).toEqual(first.descriptionUpdates);
expect(second.embeddingUpdates).toEqual(first.embeddingUpdates);
@ -731,7 +764,7 @@ describe('local scan enrichment', () => {
tables: [{ ...firstTable, name: 'customers' }],
})),
};
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -747,7 +780,7 @@ describe('local scan enrichment', () => {
expect(result.state.resumedStages).toEqual([]);
expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).toHaveBeenCalled();
expect(generateObject).toHaveBeenCalled();
});
it('runs providerless enriched scans as relationship-only discovery enrichment', async () => {

View file

@ -1,7 +1,7 @@
import pLimit from 'p-limit';
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
import {
completedKtxScanEnrichmentStateSummary,
@ -41,7 +41,7 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
const DESCRIPTION_TABLE_CONCURRENCY = 4;
export interface KtxLocalScanEnrichmentProviders {
llmRuntime: KtxLlmRuntimePort;
@ -53,6 +53,7 @@ export interface KtxLocalScanEnrichmentInput {
mode: KtxScanMode;
detectRelationships?: boolean;
connector: KtxScanConnector;
snapshot?: KtxSchemaSnapshot;
context: KtxScanContext;
providers: KtxLocalScanEnrichmentProviders | null;
stateStore?: KtxScanEnrichmentStateStore | null;
@ -179,7 +180,17 @@ function deterministicLlmRuntime(): KtxLlmRuntimePort {
async generateText(input) {
return `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'data source'}`;
},
async generateObject() {
async generateObject(input) {
if (input.prompt.includes('Sample rows:')) {
const columns = Array.from(input.prompt.matchAll(/^- ([^\s(]+)/gm), (match) => ({
name: match[1] ?? 'column',
description: `Deterministic description for ${match[1] ?? 'column'}`,
}));
return {
tableDescription: `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'table'}`,
columns,
} as never;
}
return { pkCandidates: [], fkCandidates: [] } as never;
},
async runAgentLoop() {
@ -234,30 +245,6 @@ export function snapshotToKtxEnrichedSchema(
};
}
function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable {
return {
catalog: table.catalog,
db: table.db,
name: table.name,
columns: table.columns.map((column) => ({
name: column.name,
...(column.comment ? { sampleValues: [column.comment], rawDescriptions: { db: column.comment } } : {}),
})),
};
}
function tableMetadataColumns(table: KtxSchemaTable): Array<{
name: string;
nativeType?: string | null;
comment?: string | null;
}> {
return table.columns.map((column) => ({
name: column.name,
nativeType: column.nativeType ?? null,
comment: column.comment ?? null,
}));
}
function embeddingBatchSize(maxBatchSize: number): number {
return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100;
}
@ -306,32 +293,28 @@ async function generateDescriptions(input: {
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
const batched = await generator.generateBatchedTableDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
columns: tableMetadataColumns(table),
columns: table.columns.map((column) => ({
name: column.name,
type: column.nativeType,
...(column.comment ? { rawDescriptions: { db: column.comment } } : {}),
})),
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
tableDescription: batched.tableDescription,
columnDescriptions: Object.fromEntries(batched.columnDescriptions),
};
}),
),
@ -472,15 +455,17 @@ export async function runLocalScanEnrichment(
): Promise<KtxLocalScanEnrichmentResult> {
const progress = input.context.progress;
await progress?.update(0, 'Loading enrichment schema snapshot');
const snapshot = await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
);
const snapshot =
input.snapshot ??
(await input.connector.introspect(
{
connectionId: input.connectionId,
driver: input.connector.driver,
mode: input.mode,
detectRelationships: input.detectRelationships,
},
input.context,
));
await progress?.update(0.05, `Loaded schema snapshot with ${snapshot.tables.length} tables`);
const now = input.now ?? (() => new Date());

View file

@ -126,7 +126,43 @@ async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Pro
);
}
function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter {
function defaultFetchSnapshot(options: { extractedAt?: () => string } = {}): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: options.extractedAt?.() ?? '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
name: 'orders',
catalog: null,
db: 'public',
kind: 'table',
comment: null,
estimatedRows: null,
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: null,
},
],
foreignKeys: [],
},
],
};
}
function fetchOnlyAdapter(options: { extractedAt?: () => string; snapshot?: KtxSchemaSnapshot } = {}): SourceAdapter {
const scanSnapshot = options.snapshot
? { ...options.snapshot, ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}) }
: defaultFetchSnapshot(options);
return {
source: 'live-database',
skillNames: ['live_database_ingest'],
@ -135,39 +171,89 @@ function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceA
await writeFile(
join(stagedDir, 'connection.json'),
`${JSON.stringify({
connectionId: 'warehouse',
driver: 'postgres',
...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}),
scope: { schemas: ['public'] },
metadata: {},
connectionId: scanSnapshot.connectionId,
driver: scanSnapshot.driver,
extractedAt: scanSnapshot.extractedAt,
scope: scanSnapshot.scope,
metadata: scanSnapshot.metadata,
})}\n`,
'utf-8',
);
await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8');
await writeFile(
join(stagedDir, 'tables', 'orders.json'),
'{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":null,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n',
'utf-8',
);
for (const table of scanSnapshot.tables) {
await writeFile(join(stagedDir, 'tables', `${table.name}.json`), `${JSON.stringify(table)}\n`, 'utf-8');
}
},
async detect() {
return true;
},
async chunk() {
return {
workUnits: [
{
unitKey: 'live-database-public-orders',
rawFiles: ['tables/orders.json'],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
},
],
workUnits: scanSnapshot.tables.map((table) => ({
unitKey: `live-database-${table.db ?? 'default'}-${table.name}`,
rawFiles: [`tables/${table.name}.json`],
dependencyPaths: ['connection.json', 'foreign-keys.json'],
peerFileIndex: [],
})),
};
},
};
}
function nativeScanSnapshot(): KtxSchemaSnapshot {
return {
connectionId: 'warehouse',
driver: 'postgres',
extractedAt: '2026-04-29T09:00:00.000Z',
scope: { schemas: ['public'] },
metadata: {},
tables: [
{
catalog: null,
db: 'public',
name: 'orders',
kind: 'table',
comment: 'Orders',
estimatedRows: 1,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number',
nullable: false,
primaryKey: true,
comment: 'Order id',
},
],
},
],
};
}
function nativeScanConnector(options: { cleanup?: () => Promise<void> } = {}): KtxScanConnector {
return {
id: 'test:warehouse',
driver: 'postgres',
capabilities: {
structuralIntrospection: true,
tableSampling: true,
columnSampling: true,
columnStats: false,
readOnlySql: false,
nestedAnalysis: false,
eventStreamDiscovery: false,
formalForeignKeys: false,
estimatedRowCounts: false,
},
introspect: vi.fn(async () => nativeScanSnapshot()),
sampleTable: vi.fn(async () => ({ headers: ['id'], rows: [[1]], totalRows: 1 })),
sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1 })),
...(options.cleanup ? { cleanup: options.cleanup } : {}),
};
}
describe('local scan', () => {
let tempDir: string;
let project: KtxLocalProject;
@ -338,6 +424,59 @@ describe('local scan', () => {
});
});
it('threads the structural snapshot into enrichment without connector re-introspection', async () => {
project.config.scan.enrichment = { mode: 'deterministic' };
const connector = nativeScanConnector();
const introspect = vi.mocked(connector.introspect);
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'enriched',
connector,
jobId: 'scan-enrichment-snapshot-threading',
now: () => new Date('2026-04-29T09:11:00.000Z'),
});
expect(result.report.enrichment.tableDescriptions).toBe('completed');
expect(introspect).not.toHaveBeenCalled();
});
it('cleans up a scan connector constructed by local scan', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
createConnector: vi.fn(async () => nativeScanConnector({ cleanup })),
jobId: 'scan-owned-connector-cleanup',
now: () => new Date('2026-04-29T09:13:00.000Z'),
});
expect(cleanup).toHaveBeenCalledTimes(1);
});
it('does not clean up a caller-supplied scan connector', async () => {
const cleanup = vi.fn(async () => undefined);
await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
connector: nativeScanConnector({ cleanup }),
jobId: 'scan-supplied-connector-cleanup',
now: () => new Date('2026-04-29T09:13:30.000Z'),
});
expect(cleanup).not.toHaveBeenCalled();
});
it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => {
const first = await runLocalScan({
project,
@ -520,10 +659,11 @@ describe('local scan', () => {
};
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -607,10 +747,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -837,10 +978,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -946,10 +1088,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -1072,10 +1215,11 @@ describe('local scan', () => {
return relationshipSqlResult(input);
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1202,10 +1346,11 @@ describe('local scan', () => {
return relationshipSqlResult(input, { throwOnCoverage: true });
},
};
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const result = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
@ -1337,7 +1482,8 @@ describe('local scan', () => {
join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'),
'utf-8',
);
expect(manifestRaw).toContain('ai: "Deterministic description');
expect(manifestRaw).toContain('ai: |-');
expect(manifestRaw).toContain('Deterministic description');
});
it('persists structural artifacts and a recoverable warning when standalone enrichment execution fails', async () => {
@ -1510,10 +1656,11 @@ describe('local scan', () => {
},
};
const llmRuntime = deterministicLlmRuntime();
const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() });
const first = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1542,7 +1689,7 @@ describe('local scan', () => {
const generateObject = vi.spyOn(llmRuntime, 'generateObject');
const retry = await runLocalScan({
project,
adapters: [fetchOnlyAdapter()],
adapters: [adapter],
connectionId: 'warehouse',
mode: 'enriched',
connector,
@ -1568,7 +1715,6 @@ describe('local scan', () => {
failedStages: [],
});
expect(retry.report.enrichment.embeddings).toBe('completed');
expect(generateObject).toHaveBeenCalledTimes(1);
expect(generateObject).toHaveBeenCalledWith(expect.objectContaining({ role: 'candidateExtraction' }));
expect(embeddingAttempts).toBe(2);

View file

@ -30,6 +30,7 @@ import type {
KtxScanReport,
KtxScanTrigger,
KtxScanWarning,
KtxSchemaSnapshot,
} from './types.js';
function enrichmentResolutionWarning(
@ -388,6 +389,9 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
assertSupportedMode(mode);
await options.progress?.update(0.05, 'Preparing scan');
const rawConnector = await resolveScanConnector(options, mode);
const ownsConnector = !!rawConnector && !options.connector;
try {
const connection = options.project.config.connections[options.connectionId];
if (!connection) {
@ -454,6 +458,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
}
const enrichmentStateStore = connector ? createLocalScanEnrichmentStateStore(options) : null;
let enrichmentState: KtxScanEnrichmentStateSummary = completedKtxScanEnrichmentStateSummary();
let enrichmentSnapshot: KtxSchemaSnapshot | null = null;
if (!reusedExistingScanArtifacts && !report.dryRun && report.artifactPaths.rawSourcesDir) {
await options.progress?.update(0.7, 'Writing schema artifacts');
const rawSnapshot = await readLocalScanStructuralSnapshot({
@ -463,6 +468,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
rawSourcesDir: report.artifactPaths.rawSourcesDir,
extractedAtFallback: report.createdAt,
});
enrichmentSnapshot = rawSnapshot;
const manifestArtifacts = await writeLocalScanManifestShards({
project: options.project,
connectionId: options.connectionId,
@ -487,6 +493,7 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
mode,
detectRelationships: options.detectRelationships,
connector,
...(enrichmentSnapshot ? { snapshot: enrichmentSnapshot } : {}),
context: { runId: record.runId, progress: options.progress?.startPhase(0.18) },
providers: enrichmentProviders,
stateStore: enrichmentStateStore,
@ -557,6 +564,11 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise<LocalS
syncId: record.syncId,
report,
};
} finally {
if (ownsConnector) {
await rawConnector?.cleanup?.();
}
}
}
/** @internal */

View file

@ -70,6 +70,7 @@ interface KtxRelationshipDiagnosticsPolicy {
validationRequiredForManifest: boolean;
maxCandidatesPerColumn: number;
profileSampleRows: number;
profileConcurrency: number;
validationConcurrency: number;
}
@ -118,6 +119,7 @@ const DEFAULT_POLICY: KtxRelationshipDiagnosticsPolicy = {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
};

View file

@ -228,6 +228,7 @@ export async function discoverKtxRelationships(
executor,
ctx: input.context,
profileSampleRows: input.settings.profileSampleRows,
profileConcurrency: input.settings.profileConcurrency,
cache: profileCache,
});
const deterministicCandidates: KtxRelationshipDiscoveryCandidate[] = generateKtxRelationshipDiscoveryCandidates(

View file

@ -1,7 +1,7 @@
import { readFile } from 'node:fs/promises';
import { join } from 'node:path';
import Database from 'better-sqlite3';
import { afterEach, describe, expect, it } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js';
@ -351,4 +351,94 @@ describe('relationship profiling', () => {
scaleExecutor.close();
}
});
it('profiles tables concurrently up to profileConcurrency', async () => {
let inFlight = 0;
let maxInFlight = 0;
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => setTimeout(resolve, 10));
inFlight -= 1;
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [[input.sql.includes('accounts') ? 'id' : 'account_id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']),
executor,
ctx: { runId: 'profile-concurrency' },
profileConcurrency: 4,
});
expect(maxInFlight).toBe(4);
});
it('keeps profiling other tables when one table profile fails', async () => {
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
if (input.sql.includes('"orders"')) {
throw new Error('orders unavailable');
}
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [['id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
const result = await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders']),
executor,
ctx: { runId: 'profile-error-isolated' },
profileConcurrency: 2,
});
expect(result.warnings).toContain('profile_failed:orders:orders unavailable');
expect(result.tables).toHaveLength(2);
expect(Object.keys(result.columns)).toContain('accounts.id');
});
});
function schemaWithTables(names: string[]): KtxEnrichedSchema {
return schema(
names.map((name) =>
table(name, [
column(name, name === 'orders' ? 'account_id' : 'id', {
nullable: false,
primaryKey: name !== 'orders',
}),
]),
),
);
}

View file

@ -1,4 +1,5 @@
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { mapWithConcurrency } from './relationship-validation.js';
import type {
KtxConnectionDriver,
KtxQueryResult,
@ -60,6 +61,7 @@ export interface ProfileKtxRelationshipSchemaInput {
ctx: KtxScanContext;
sampleValuesPerColumn?: number;
profileSampleRows?: number;
profileConcurrency?: number;
cache?: KtxRelationshipProfileCache;
}
@ -389,6 +391,10 @@ async function queryTableProfile(input: {
};
}
type TableProfileResult =
| { tableProfile: Awaited<ReturnType<typeof queryTableProfile>> }
| { cached: KtxRelationshipCachedTableProfile; queryCount: 0 };
export async function profileKtxRelationshipSchema(
input: ProfileKtxRelationshipSchemaInput,
): Promise<KtxRelationshipProfileArtifact> {
@ -408,54 +414,68 @@ export async function profileKtxRelationshipSchema(
const tables: KtxRelationshipTableProfile[] = [];
const columns: Record<string, KtxRelationshipColumnProfile> = {};
const warnings: string[] = [];
const executor = input.executor;
for (const table of input.schema.tables.filter((candidate) => candidate.enabled)) {
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
const profileSampleRows = input.profileSampleRows ?? 10000;
const cacheKey = tableProfileCacheKey({
connectionId: input.connectionId,
driver: input.driver,
ctx: input.ctx,
table: table.ref,
sampleValuesPerColumn,
profileSampleRows,
});
const cached = input.cache?.tableProfiles.get(cacheKey);
if (cached) {
tables.push(cached.table);
Object.assign(columns, cached.columns);
for (const warning of cached.warnings) {
warnings.push(warning);
}
continue;
}
try {
const tableProfile = await queryTableProfile({
const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled);
const tableResults = await mapWithConcurrency<KtxEnrichedTable, TableProfileResult>(
enabledTables,
input.profileConcurrency ?? 4,
async (table) => {
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
const profileSampleRows = input.profileSampleRows ?? 10000;
const cacheKey = tableProfileCacheKey({
connectionId: input.connectionId,
driver: input.driver,
table,
executor: input.executor,
ctx: input.ctx,
table: table.ref,
sampleValuesPerColumn,
profileSampleRows,
});
queryTotal += tableProfile.queryCount;
tables.push(tableProfile.table);
Object.assign(columns, tableProfile.columns);
input.cache?.tableProfiles.set(cacheKey, {
table: tableProfile.table,
columns: tableProfile.columns,
warnings: [],
});
} catch (error) {
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
warnings.push(failureWarning);
input.cache?.tableProfiles.set(cacheKey, {
table: { table: table.ref, rowCount: 0 },
columns: {},
warnings: [failureWarning],
});
const cached = input.cache?.tableProfiles.get(cacheKey);
if (cached) {
return { cached, queryCount: 0 };
}
try {
const tableProfile = await queryTableProfile({
connectionId: input.connectionId,
driver: input.driver,
table,
executor,
ctx: input.ctx,
sampleValuesPerColumn,
profileSampleRows,
});
input.cache?.tableProfiles.set(cacheKey, {
table: tableProfile.table,
columns: tableProfile.columns,
warnings: [],
});
return { tableProfile };
} catch (error) {
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
const cachedFailure = {
table: { table: table.ref, rowCount: 0 },
columns: {},
warnings: [failureWarning],
};
input.cache?.tableProfiles.set(cacheKey, cachedFailure);
return { cached: cachedFailure, queryCount: 0 };
}
},
);
for (const result of tableResults) {
if ('tableProfile' in result) {
queryTotal += result.tableProfile.queryCount;
tables.push(result.tableProfile.table);
Object.assign(columns, result.tableProfile.columns);
continue;
}
tables.push(result.cached.table);
Object.assign(columns, result.cached.columns);
for (const warning of result.cached.warnings) {
warnings.push(warning);
}
}

View file

@ -193,7 +193,7 @@ function statusFor(input: {
return 'rejected';
}
async function mapWithConcurrency<TInput, TOutput>(
export async function mapWithConcurrency<TInput, TOutput>(
inputs: readonly TInput[],
concurrency: number,
mapOne: (input: TInput) => Promise<TOutput>,

View file

@ -96,14 +96,17 @@ const createSnowflakeLiveDatabaseIntrospection = vi.hoisted(() =>
const isKtxSnowflakeConnectionConfig = vi.hoisted(() =>
vi.fn((connection: { driver?: string } | undefined) => connection?.driver === 'snowflake'),
);
const snowflakeConnectorInstances = vi.hoisted(() => [] as Array<{ cleanup: ReturnType<typeof vi.fn> }>);
const KtxSnowflakeScanConnector = vi.hoisted(
() =>
class {
readonly id: string;
readonly driver = 'snowflake';
readonly cleanup = vi.fn(async () => undefined);
constructor(options: { connectionId: string }) {
this.id = `snowflake:${options.connectionId}`;
snowflakeConnectorInstances.push(this);
}
},
);
@ -1047,6 +1050,95 @@ describe('runKtxScan', () => {
await rm(tempProject, { recursive: true, force: true });
});
it('cleans up a constructed scan connector after an enriched scan succeeds', async () => {
await initKtxProject({ projectDir: tempDir });
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: snowflake',
' account: acct',
' warehouse: WH',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
'',
].join('\n'),
'utf-8',
);
snowflakeConnectorInstances.length = 0;
const runLocalScan = vi.fn(async (): Promise<LocalScanRunResult> => ({
runId: 'scan-run-cleanup',
status: 'done',
done: true,
connectionId: 'warehouse',
mode: 'enriched',
dryRun: false,
syncId: 'sync-1',
report: { ...report, mode: 'enriched' },
}));
await expect(
runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'enriched',
detectRelationships: false,
dryRun: false,
},
makeIo().io,
{ runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters },
),
).resolves.toBe(0);
expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1);
});
it('cleans up a constructed scan connector after runLocalScan throws', async () => {
await initKtxProject({ projectDir: tempDir });
await writeFile(
join(tempDir, 'ktx.yaml'),
[
'connections:',
' warehouse:',
' driver: snowflake',
' account: acct',
' warehouse: WH',
' database: ANALYTICS',
' schema_name: PUBLIC',
' username: reader',
' password: env:SNOWFLAKE_PASSWORD',
'',
].join('\n'),
'utf-8',
);
snowflakeConnectorInstances.length = 0;
const runLocalScan = vi.fn(async () => {
throw new Error('scan failed');
});
await expect(
runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'relationships',
detectRelationships: true,
dryRun: false,
},
makeIo().io,
{ runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters },
),
).resolves.toBe(1);
expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1);
});
it('routes standalone postgres scans through the native connector before daemon fallback', async () => {
const tempProject = await mkdtemp(join(tmpdir(), 'ktx-scan-cli-native-postgres-'));
await initKtxProject({ projectDir: tempProject });

View file

@ -375,6 +375,7 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps
writeRunSummary(result.report, args.projectDir, io);
} finally {
cliProgress?.flush();
await connector?.cleanup?.();
}
return 0;
} catch (error) {

View file

@ -545,7 +545,7 @@ describe('setup databases step', () => {
},
{
driver: 'snowflake',
selectValues: ['no'],
selectValues: ['password', 'no'],
textValues: ['', 'env:SNOWFLAKE_ACCOUNT', 'ANALYTICS_WH', 'ANALYTICS', 'env:SNOWFLAKE_USER', ''],
passwordValues: ['env:SNOWFLAKE_PASSWORD'],
expectedTextPrompts: [
@ -2090,6 +2090,7 @@ describe('setup databases step', () => {
scanConnection: vi.fn(async () => 0),
historicSqlProbe,
prompts: makePromptAdapter({
selectValues: ['password'],
textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''],
passwordValues: ['env:SNOWFLAKE_PASSWORD'],
}),
@ -2131,6 +2132,51 @@ describe('setup databases step', () => {
expect(config.ingest.adapters).toEqual([]);
});
it('configures Snowflake with RSA key-pair auth via setup wizard', async () => {
const io = makeIo();
const result = await runKtxSetupDatabasesStep(
{
projectDir: tempDir,
inputMode: 'disabled',
databaseDrivers: ['snowflake'],
databaseConnectionId: 'snowflake',
databaseSchemas: [],
skipDatabases: false,
},
io.io,
{
testConnection: vi.fn(async () => 0),
scanConnection: vi.fn(async () => 0),
prompts: makePromptAdapter({
selectValues: ['rsa'],
textValues: [
'env:SNOWFLAKE_ACCOUNT',
'WH',
'ANALYTICS',
'reader',
'~/.ssh/snowflake_rsa_key.p8',
'',
],
passwordValues: ['env:SNOWFLAKE_KEY_PASS'],
}),
},
);
expect(result.status).toBe('ready');
const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8'));
expect(config.connections.snowflake).toMatchObject({
driver: 'snowflake',
authMethod: 'rsa',
account: 'env:SNOWFLAKE_ACCOUNT',
warehouse: 'WH',
database: 'ANALYTICS',
username: 'reader',
privateKey: 'file:~/.ssh/snowflake_rsa_key.p8', // pragma: allowlist secret
passphrase: 'env:SNOWFLAKE_KEY_PASS', // pragma: allowlist secret
});
expect(config.connections.snowflake.password).toBeUndefined();
});
it('writes Postgres query history config with minExecutions and ignores window/redaction output', async () => {
const io = makeIo();
const result = await runKtxSetupDatabasesStep(

View file

@ -1015,30 +1015,80 @@ async function buildConnectionConfig(input: {
stringConfigField(input.existingConnection, 'username'),
);
if (username === undefined) return 'back';
const passwordRef = await promptCredential({
prompts,
message: 'Snowflake password',
projectDir: args.projectDir,
connectionId: input.connectionId,
secretName: 'password', // pragma: allowlist secret
const authChoice = await prompts.select({
message: 'Snowflake authentication method',
options: [
{ value: 'password', label: 'Password' },
{ value: 'rsa', label: 'Key-pair (RSA / JWT)' },
{ value: 'back', label: 'Back' },
],
});
if (passwordRef === 'back') return 'back'; // pragma: allowlist secret
if (authChoice === 'back') return 'back';
const authMethod: 'password' | 'rsa' = authChoice === 'rsa' ? 'rsa' : 'password';
let passwordRef: string | null = null;
let privateKeyInput: string | undefined;
let passphraseRef: string | null = null;
if (authMethod === 'password') {
const ref = await promptCredential({
prompts,
message: 'Snowflake password',
projectDir: args.projectDir,
connectionId: input.connectionId,
secretName: 'password', // pragma: allowlist secret
});
if (ref === 'back') return 'back'; // pragma: allowlist secret
passwordRef = ref;
} else {
privateKeyInput = await promptText(
prompts,
'Path to Snowflake private key (PEM)\nFor example ~/.ssh/snowflake_rsa_key.p8, or $ENV_VAR / env:NAME / file:/abs/path.',
displayFileReference(stringConfigField(input.existingConnection, 'privateKey')),
);
if (privateKeyInput === undefined) return 'back';
const phr = await promptCredential({
prompts,
message: 'Private key passphrase (optional)\nPress Enter to skip.',
projectDir: args.projectDir,
connectionId: input.connectionId,
secretName: 'snowflake-passphrase', // pragma: allowlist secret
});
if (phr === 'back') return 'back';
passphraseRef = phr;
}
const role = await promptText(
prompts,
'Snowflake role (optional)\nPress Enter to skip.',
stringConfigField(input.existingConnection, 'role'),
);
if (role === undefined) return 'back';
const resolvedPasswordRef = passwordRef ?? stringConfigField(input.existingConnection, 'password');
if (!account || !warehouse || !database || !username || !resolvedPasswordRef) return null;
if (authMethod === 'password') {
const resolvedPasswordRef = passwordRef ?? stringConfigField(input.existingConnection, 'password');
if (!account || !warehouse || !database || !username || !resolvedPasswordRef) return null;
return {
driver: 'snowflake',
authMethod: 'password',
account,
warehouse,
database,
username,
password: resolvedPasswordRef,
...(role ? { role } : {}),
};
}
const resolvedPrivateKey = privateKeyInput
? normalizeFileReference(privateKeyInput)
: stringConfigField(input.existingConnection, 'privateKey');
if (!account || !warehouse || !database || !username || !resolvedPrivateKey) return null;
const resolvedPassphrase = passphraseRef ?? stringConfigField(input.existingConnection, 'passphrase');
return {
driver: 'snowflake',
authMethod: 'password',
authMethod: 'rsa',
account,
warehouse,
database,
username,
password: resolvedPasswordRef,
privateKey: resolvedPrivateKey,
...(resolvedPassphrase ? { passphrase: resolvedPassphrase } : {}),
...(role ? { role } : {}),
};
}

View file

@ -47,6 +47,7 @@ describe('buildProjectStackSnapshotFields', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
},