diff --git a/docs-site/content/docs/configuration/ktx-yaml.mdx b/docs-site/content/docs/configuration/ktx-yaml.mdx index 4008a45d..2220814a 100644 --- a/docs-site/content/docs/configuration/ktx-yaml.mdx +++ b/docs-site/content/docs/configuration/ktx-yaml.mdx @@ -157,6 +157,12 @@ connections: dataset_ids: [analytics, mart] ``` +For Snowflake connections, set `maxSessions` when deep ingest needs more or +fewer concurrent warehouse sessions. The default is `4`. This caps all +concurrent Snowflake SQL work for that connector instance, including schema +introspection, table sampling, relationship profiling, relationship +validation, and read-only SQL execution. + For Postgres, BigQuery, and Snowflake, `historicSql` and `context.queryHistory` toggle query-history ingest. The shape is connector-specific; the setup wizard writes these fields when you pass `--enable-query-history`. @@ -483,6 +489,7 @@ scan: maxLlmTablesPerBatch: 40 maxCandidatesPerColumn: 25 profileSampleRows: 10000 + profileConcurrency: 4 validationConcurrency: 4 validationBudget: all ``` @@ -510,6 +517,7 @@ the manifest. | `relationships.maxLlmTablesPerBatch` | `int > 0` | `40` | Max tables included in a single LLM relationship-proposal batch. | | `relationships.maxCandidatesPerColumn` | `int > 0` | `25` | Max join partners considered per column. | | `relationships.profileSampleRows` | `int > 0` | `10000` | Rows sampled per table when profiling values for relationship inference. | +| `relationships.profileConcurrency` | `int > 0` | `4` | Parallel relationship-profile queries against the database. For Snowflake, effective database concurrency is also bounded by the connection's `maxSessions`. | | `relationships.validationConcurrency` | `int > 0` | `4` | Parallel relationship validation queries against the database. | | `relationships.validationBudget` | `all` \| `int ≥ 0` | runtime default | Cap on validation queries per scan. `all` means unlimited. | diff --git a/packages/cli/src/connectors/snowflake/connector.test.ts b/packages/cli/src/connectors/snowflake/connector.test.ts index 58761066..b6d240c1 100644 --- a/packages/cli/src/connectors/snowflake/connector.test.ts +++ b/packages/cli/src/connectors/snowflake/connector.test.ts @@ -1,4 +1,12 @@ import { describe, expect, it, vi } from 'vitest'; + +const createPool = vi.hoisted(() => vi.fn()); + +vi.mock('snowflake-sdk', () => ({ + default: { createPool }, + createPool, +})); + import { createSnowflakeLiveDatabaseIntrospection } from '../../connectors/snowflake/live-database-introspection.js'; import { isKtxSnowflakeConnectionConfig, KtxSnowflakeScanConnector, snowflakeConnectionConfigFromConfig, type KtxSnowflakeDriver, type KtxSnowflakeDriverFactory } from '../../connectors/snowflake/connector.js'; import { tableRefSet } from '../../context/scan/table-ref.js'; @@ -64,6 +72,38 @@ function fakeDriverFactory(): KtxSnowflakeDriverFactory { return { createDriver: vi.fn(() => driver) }; } +function fakeSnowflakeStatement(headers: string[] = ['ONE']) { + return { + getColumns: () => headers.map((header) => ({ getName: () => header, getType: () => 'TEXT' })), + }; +} + +function installSnowflakePoolMock() { + const executedSql: string[] = []; + const connection = { + execute: vi.fn( + (input: { + sqlText: string; + complete: ( + error: Error | null, + statement: ReturnType, + rows: Array>, + ) => void; + }) => { + executedSql.push(input.sqlText); + input.complete(null, fakeSnowflakeStatement(), [{ ONE: 1 }]); + }, + ), + }; + const pool = { + use: vi.fn(async (fn: (conn: typeof connection) => Promise) => fn(connection)), + drain: vi.fn(async () => undefined), + clear: vi.fn(async () => undefined), + }; + createPool.mockReturnValue(pool); + return { connection, pool, executedSql }; +} + describe('KtxSnowflakeScanConnector', () => { it('resolves Snowflake connection configuration safely', () => { expect( @@ -100,6 +140,99 @@ describe('KtxSnowflakeScanConnector', () => { }); }); + it('defaults and validates Snowflake maxSessions', () => { + const baseConnection = { + driver: 'snowflake', + authMethod: 'password', + account: 'acct', + warehouse: 'WH', + database: 'ANALYTICS', + schema_name: 'PUBLIC', + username: 'reader', + password: 'fixture-pass', // pragma: allowlist secret + } as const; + + expect( + snowflakeConnectionConfigFromConfig({ + connectionId: 'warehouse', + connection: baseConnection, + }), + ).toMatchObject({ maxSessions: 4 }); + + expect( + snowflakeConnectionConfigFromConfig({ + connectionId: 'warehouse', + connection: { ...baseConnection, maxSessions: 8 }, + }), + ).toMatchObject({ maxSessions: 8 }); + + for (const maxSessions of [0, -1, 1.5, Number.NaN]) { + expect(() => + snowflakeConnectionConfigFromConfig({ + connectionId: 'warehouse', + connection: { ...baseConnection, maxSessions }, + }), + ).toThrow('connections.warehouse.maxSessions must be a positive integer'); + } + }); + + it('uses one lazy Snowflake pool and drains it during cleanup', async () => { + const { pool, executedSql } = installSnowflakePoolMock(); + const close = vi.fn(async () => undefined); + const connector = new KtxSnowflakeScanConnector({ + connectionId: 'warehouse', + connection: { + driver: 'snowflake', + authMethod: 'password', + account: 'acct', + warehouse: 'WH', + database: 'ANALYTICS', + schema_name: 'PUBLIC', + username: 'reader', + password: 'fixture-pass', // pragma: allowlist secret + role: 'ANALYST', + maxSessions: 3, + }, + sdkOptionsProvider: { + resolve: vi.fn(async () => ({ sdkOptions: { application: 'ktx-test' }, close })), + }, + }); + + expect(createPool).not.toHaveBeenCalled(); + + await connector.executeReadOnly({ connectionId: 'warehouse', sql: 'select 1', maxRows: 1 }, { runId: 'run-1' }); + await connector.executeReadOnly({ connectionId: 'warehouse', sql: 'select 1', maxRows: 1 }, { runId: 'run-1' }); + + expect(createPool).toHaveBeenCalledTimes(1); + expect(createPool).toHaveBeenCalledWith( + expect.objectContaining({ + account: 'acct', + username: 'reader', + warehouse: 'WH', + database: 'ANALYTICS', + schema: 'PUBLIC', + role: 'ANALYST', + password: 'fixture-pass', // pragma: allowlist secret + clientSessionKeepAlive: true, + clientSessionKeepAliveHeartbeatFrequency: 900, + application: 'ktx-test', + }), + expect.objectContaining({ + min: 0, + max: 3, + evictionRunIntervalMillis: 30_000, + acquireTimeoutMillis: 60_000, + }), + ); + expect(pool.use).toHaveBeenCalledTimes(2); + expect(executedSql.some((sql) => /^USE\s+/i.test(sql.trim()))).toBe(false); + + await connector.cleanup(); + expect(pool.drain).toHaveBeenCalledBefore(pool.clear); + expect(pool.clear).toHaveBeenCalledTimes(1); + expect(close).toHaveBeenCalledTimes(1); + }); + it('introspects schema, primary keys, comments, row counts, and dimensions', async () => { const connector = new KtxSnowflakeScanConnector({ connectionId: 'warehouse', diff --git a/packages/cli/src/connectors/snowflake/connector.ts b/packages/cli/src/connectors/snowflake/connector.ts index 25d5eaf3..91560e19 100644 --- a/packages/cli/src/connectors/snowflake/connector.ts +++ b/packages/cli/src/connectors/snowflake/connector.ts @@ -24,6 +24,7 @@ export interface KtxSnowflakeConnectionConfig { privateKey?: string; passphrase?: string; role?: string; + maxSessions?: number; [key: string]: unknown; } @@ -38,6 +39,7 @@ export interface KtxSnowflakeResolvedConnectionConfig { privateKey?: string; passphrase?: string; role?: string; + maxSessions: number; } export interface KtxSnowflakeRawColumnMetadata { @@ -132,6 +134,23 @@ function stringConfigValue( return typeof value === 'string' && value.trim().length > 0 ? resolveStringReference(value.trim(), env) : undefined; } +function positiveIntegerConfigValue(input: { + connection: KtxSnowflakeConnectionConfig; + key: keyof KtxSnowflakeConnectionConfig; + connectionId: string; + defaultValue: number; +}): number { + const value = input.connection[input.key]; + if (value === undefined) { + return input.defaultValue; + } + const numberValue = Number(value); + if (!Number.isInteger(numberValue) || numberValue < 1) { + throw new Error(`connections.${input.connectionId}.${String(input.key)} must be a positive integer`); + } + return numberValue; +} + function schemaNames(connection: KtxSnowflakeConnectionConfig, env: NodeJS.ProcessEnv): string[] { if (Array.isArray(connection.schema_names) && connection.schema_names.length > 0) { return connection.schema_names @@ -230,6 +249,12 @@ export function snowflakeConnectionConfigFromConfig(input: { database, schemas: resolvedSchemas, username, + maxSessions: positiveIntegerConfigValue({ + connection: input.connection, + key: 'maxSessions', + connectionId: input.connectionId, + defaultValue: 4, + }), }; const role = stringConfigValue(input.connection, 'role', env); if (role) { @@ -265,6 +290,7 @@ class DefaultSnowflakeDriverFactory implements KtxSnowflakeDriverFactory { class SnowflakeSdkDriver implements KtxSnowflakeDriver { private closeSdkOptions: Array<() => Promise> = []; + private pool: ReturnType | null = null; constructor( private readonly resolved: KtxSnowflakeResolvedConnectionConfig, @@ -285,16 +311,21 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver { } async query(sql: string, params?: unknown): Promise { - let connection: Connection | null = null; + const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined; try { - connection = await this.createConnection(); - const binds = Array.isArray(params) ? toSnowflakeBinds(params) : undefined; - const result = await this.executeSnowflakeQuery(connection, sql, binds); + const pool = await this.getPool(); + const result = await pool.use(async (connection: snowflake.Connection) => + this.executeSnowflakeQuery(connection, sql, binds), + ); return { ...result, totalRows: result.rows.length, rowCount: result.rows.length }; - } finally { - if (connection) { - await this.destroyConnection(connection); + } catch (error) { + const message = error instanceof Error ? error.message : String(error); + if (/timeout/i.test(message) && /pool|acquire/i.test(message)) { + throw new Error( + "Snowflake session pool exhausted after 60s - consider lowering maxSessions or increasing your account's concurrent-statement limit.", + ); } + throw error; } } @@ -375,27 +406,41 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver { } async cleanup(): Promise { + const pool = this.pool; + this.pool = null; + if (pool) { + // Drain before clear so in-flight Snowflake statements finish before idle + // sessions are closed. + await pool.drain(); + await pool.clear(); + } const closers = this.closeSdkOptions; this.closeSdkOptions = []; - await Promise.all(closers.map((close) => close())); + await Promise.all(closers.map((close) => Promise.resolve(close()))); } private async runTest(): Promise<{ success: boolean; error?: string }> { - let connection: Connection | null = null; try { - connection = await this.createConnection(); - await this.executeSnowflakeQuery(connection, 'SELECT 1'); + await this.query('SELECT 1'); return { success: true }; } catch (error) { return { success: false, error: error instanceof Error ? error.message : String(error) }; - } finally { - if (connection) { - await this.destroyConnection(connection); - } } } - private async createConnection(): Promise { + private async getPool(): Promise> { + if (!this.pool) { + this.pool = snowflake.createPool(await this.resolveConnectionOptions(), { + min: 0, + max: this.resolved.maxSessions, + evictionRunIntervalMillis: 30_000, + acquireTimeoutMillis: 60_000, + }); + } + return this.pool; + } + + private async resolveConnectionOptions(): Promise { const patch = await this.sdkOptionsProvider?.resolve({ account: this.resolved.account, connection: { ...this.resolved, driver: 'snowflake' }, @@ -411,47 +456,13 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver { database: this.resolved.database, ...(sessionSchema ? { schema: sessionSchema } : {}), role: this.resolved.role, + clientSessionKeepAlive: true, + clientSessionKeepAliveHeartbeatFrequency: 900, ...patch?.sdkOptions, }; - const connectionConfig: ConnectionOptions = - this.resolved.authMethod === 'rsa' - ? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() } - : { ...baseConfig, password: this.resolved.password }; - const connection = snowflake.createConnection(connectionConfig); - return new Promise((resolveConnection, rejectConnection) => { - connection.connect((error, connected) => { - if (error) { - rejectConnection(error); - return; - } - const resolvedConnection = connected ?? connection; - this.setConnectionContext(resolvedConnection).then( - () => resolveConnection(resolvedConnection), - (contextError) => { - resolvedConnection.destroy(() => undefined); - rejectConnection(contextError); - }, - ); - }); - }); - } - - private async setConnectionContext(connection: Connection): Promise { - if (this.resolved.role) { - await this.executeSnowflakeQuery(connection, `USE ROLE ${quoteSnowflakeIdentifier(this.resolved.role, 'role')}`); - } - await this.executeSnowflakeQuery( - connection, - `USE WAREHOUSE ${quoteSnowflakeIdentifier(this.resolved.warehouse, 'warehouse')}`, - ); - await this.executeSnowflakeQuery( - connection, - `USE DATABASE ${quoteSnowflakeIdentifier(this.resolved.database, 'database')}`, - ); - await this.executeSnowflakeQuery( - connection, - `USE SCHEMA ${quoteSnowflakeIdentifier(this.resolved.schemas[0] ?? 'PUBLIC', 'schema')}`, - ); + return this.resolved.authMethod === 'rsa' + ? { ...baseConfig, authenticator: 'SNOWFLAKE_JWT', privateKey: this.decryptPrivateKey() } + : { ...baseConfig, password: this.resolved.password }; } private async executeSnowflakeQuery( @@ -480,18 +491,6 @@ class SnowflakeSdkDriver implements KtxSnowflakeDriver { }); } - private destroyConnection(connection: Connection): Promise { - return new Promise((resolveDestroy, rejectDestroy) => { - connection.destroy((error) => { - if (error) { - rejectDestroy(error); - return; - } - resolveDestroy(); - }); - }); - } - private decryptPrivateKey(): string { if (!this.resolved.privateKey) { throw new Error('Private key is required for RSA authentication'); diff --git a/packages/cli/src/context/project/config.test.ts b/packages/cli/src/context/project/config.test.ts index 3b7f2feb..55188aa2 100644 --- a/packages/cli/src/context/project/config.test.ts +++ b/packages/cli/src/context/project/config.test.ts @@ -74,6 +74,7 @@ connections: maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, }, @@ -278,6 +279,7 @@ scan: maxLlmTablesPerBatch: 12 maxCandidatesPerColumn: 7 profileSampleRows: 500 + profileConcurrency: 3 validationConcurrency: 2 validationBudget: 0 `); @@ -291,6 +293,7 @@ scan: maxLlmTablesPerBatch: 12, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, validationBudget: 0, }); @@ -302,6 +305,7 @@ scan: expect(serializeKtxProjectConfig(config)).toContain('maxLlmTablesPerBatch: 12'); expect(serializeKtxProjectConfig(config)).toContain('maxCandidatesPerColumn: 7'); expect(serializeKtxProjectConfig(config)).toContain('profileSampleRows: 500'); + expect(serializeKtxProjectConfig(config)).toContain('profileConcurrency: 3'); expect(serializeKtxProjectConfig(config)).toContain('validationConcurrency: 2'); expect(serializeKtxProjectConfig(config)).toContain('validationBudget: 0'); }); @@ -326,6 +330,7 @@ scan: maxLlmTablesPerBatch: 0 maxCandidatesPerColumn: -4 profileSampleRows: 0 + profileConcurrency: 0 validationConcurrency: 0 validationBudget: 1.5 `; @@ -341,6 +346,7 @@ scan: 'scan.relationships.maxLlmTablesPerBatch', 'scan.relationships.maxCandidatesPerColumn', 'scan.relationships.profileSampleRows', + 'scan.relationships.profileConcurrency', 'scan.relationships.validationConcurrency', 'scan.relationships.validationBudget', ]), diff --git a/packages/cli/src/context/project/config.ts b/packages/cli/src/context/project/config.ts index 2824ca59..e83f502e 100644 --- a/packages/cli/src/context/project/config.ts +++ b/packages/cli/src/context/project/config.ts @@ -163,6 +163,11 @@ const scanRelationshipsSchema = z .default(25) .describe('Maximum number of candidate join partners considered per column during relationship discovery.'), profileSampleRows: z.int().positive().default(10000).describe('Number of rows sampled per table when profiling values for relationship inference.'), + profileConcurrency: z + .int() + .positive() + .default(4) + .describe('Parallel relationship-profile queries run against the database during scan.'), validationConcurrency: z.int().positive().default(4).describe('Number of relationship validation queries run in parallel against the database.'), validationBudget: z .union([z.literal('all'), z.int().nonnegative()]) diff --git a/packages/cli/src/context/scan/description-generation.test.ts b/packages/cli/src/context/scan/description-generation.test.ts index e47d32be..bc7b1e25 100644 --- a/packages/cli/src/context/scan/description-generation.test.ts +++ b/packages/cli/src/context/scan/description-generation.test.ts @@ -378,6 +378,121 @@ describe('KtxDescriptionGenerator', () => { expect(cache.set).toHaveBeenCalledWith('warehouse.public.orders', 'Commerce orders'); expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders'); }); + + it('generates one structured table description and reuses table samples for all columns', async () => { + const llmRuntime = createLlmProvider('unused'); + llmRuntime.generateObject = vi.fn(async () => ({ + tableDescription: 'Commerce orders', + columns: [ + { name: 'status', description: 'Current order state' }, + { name: 'amount', description: 'Order amount in dollars' }, + ], + })); + const connector = createConnector(); + const generator = new KtxDescriptionGenerator({ + llmRuntime, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + rawDescriptions: { db: 'Orders fact table' }, + columns: [ + { name: 'status', type: 'text' }, + { name: 'amount', type: 'numeric' }, + ], + }, + }); + + expect(result.tableDescription).toBe('Commerce orders'); + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ + status: 'Current order state', + amount: 'Order amount in dollars', + }); + expect(connector.sampleTable).toHaveBeenCalledTimes(1); + expect(connector.sampleColumn).not.toHaveBeenCalled(); + expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1); + expect(llmRuntime.generateText).not.toHaveBeenCalled(); + }); + + it('falls back to one column generateText call for each missing structured column', async () => { + const llmRuntime = createLlmProvider('Fallback status'); + llmRuntime.generateObject = vi.fn(async () => ({ + tableDescription: 'Commerce orders', + columns: [{ name: 'amount', description: 'Order amount in dollars' }], + })); + const connector = createConnector(); + const generator = new KtxDescriptionGenerator({ + llmRuntime, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [ + { name: 'status', type: 'text' }, + { name: 'amount', type: 'numeric' }, + ], + }, + }); + + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ + status: 'Fallback status', + amount: 'Order amount in dollars', + }); + expect(connector.sampleColumn).not.toHaveBeenCalled(); + expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1); + expect(llmRuntime.generateText).toHaveBeenCalledTimes(1); + }); + + it('does not run per-column fallback when structured object generation throws', async () => { + const llmRuntime = createLlmProvider('Fallback description'); + llmRuntime.generateObject = vi.fn(async () => { + throw new Error('object output unavailable'); + }); + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmRuntime, + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector: createConnector(), + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'status', type: 'text' }], + }, + }); + + expect(result.tableDescription).toBeNull(); + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ status: null }); + expect(warnings).toContain('enrichment_failed'); + expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1); + expect(llmRuntime.generateText).not.toHaveBeenCalled(); + }); }); describe('KtxDescriptionGenerator resilience', () => { diff --git a/packages/cli/src/context/scan/description-generation.ts b/packages/cli/src/context/scan/description-generation.ts index 640ae1a6..c6a41449 100644 --- a/packages/cli/src/context/scan/description-generation.ts +++ b/packages/cli/src/context/scan/description-generation.ts @@ -1,4 +1,5 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; +import { z } from 'zod'; import type { KtxColumnSampleInput, KtxColumnSampleResult, @@ -53,7 +54,7 @@ export interface KtxDescriptionColumn { sampleValues?: unknown[]; } -export interface KtxDescriptionColumnTable extends KtxTableRef { +interface KtxDescriptionColumnTable extends KtxTableRef { columns: KtxDescriptionColumn[]; } @@ -112,6 +113,23 @@ export interface KtxGenerateTableDescriptionInput { table: KtxDescriptionTableInput; } +export interface KtxGenerateBatchedTableDescriptionsInput { + connectionId: string; + connector: KtxDescriptionSamplingPort; + context: KtxScanContext; + dataSourceType: string; + supportsNestedAnalysis: boolean; + table: KtxDescriptionColumnTable & { + rawDescriptions?: Record; + columns: Array; + }; +} + +export interface KtxBatchedTableDescriptionsResult { + tableDescription: string | null; + columnDescriptions: Map; +} + export interface KtxGenerateDataSourceDescriptionInput { connectionId: string; connector: KtxDescriptionSamplingPort; @@ -136,6 +154,18 @@ interface ColumnTaskResult { skipped: boolean; } +const batchedTableDescriptionSchema = z.object({ + tableDescription: z.string(), + columns: z.array( + z.object({ + name: z.string(), + description: z.string(), + }), + ), +}); + +type BatchedTableDescriptionOutput = z.infer; + function descriptionSources(rawDescriptions: Record | undefined): Array<[string, string]> { if (!rawDescriptions) { return []; @@ -250,6 +280,76 @@ function wordLimitLine(maxWords: number): string { return `Please provide a concise description in ${maxWords} words or less.`; } +function sampleValuesByColumn( + columns: readonly KtxDescriptionColumn[], + sampleData: KtxTableSampleResult | null, +): Map { + const values = new Map(); + for (const column of columns) { + const existingValues = column.sampleValues?.filter((value) => value !== null && value !== undefined) ?? []; + if (existingValues.length > 0) { + values.set(column.name, existingValues); + } + } + if (!sampleData) { + return values; + } + for (const column of columns) { + const index = sampleData.headers.findIndex((header) => header.toLowerCase() === column.name.toLowerCase()); + if (index < 0) { + continue; + } + const sampledValues = sampleData.rows + .map((row) => row[index]) + .filter((value) => value !== null && value !== undefined); + if (sampledValues.length > 0) { + values.set(column.name, sampledValues); + } + } + return values; +} + +function batchedPrompt(input: { + table: KtxGenerateBatchedTableDescriptionsInput['table']; + sampleData: KtxTableSampleResult | null; + dataSourceType: string; + tableMaxWords: number; + columnMaxWords: number; +}): KtxDescriptionPrompt { + const columnLines = input.table.columns + .map((column) => { + const typePart = column.type ? ` (${column.type})` : ''; + const commentPart = column.rawDescriptions?.db ? ` - ${column.rawDescriptions.db}` : ''; + return `- ${column.name}${typePart}${commentPart}`; + }) + .join('\n'); + const sampleLines = + input.sampleData && input.sampleData.rows.length > 0 + ? input.sampleData.rows + .slice(0, 5) + .map((row) => + input.sampleData!.headers.map((header, index) => `${header}=${String(row[index] ?? '')}`).join(', '), + ) + .join('\n') + : 'unavailable'; + return { + system: [ + 'Analyze one database table and return structured JSON matching the supplied schema.', + `The table description must be ${input.tableMaxWords} words or less.`, + `Each column description must be ${input.columnMaxWords} words or less.`, + 'Describe business meaning directly. Do not repeat table or column names as filler.', + ].join('\n'), + user: [ + `Table: ${input.table.name}`, + `Data source type: ${input.dataSourceType}`, + 'Columns:', + columnLines, + 'Sample rows:', + sampleLines, + ].join('\n'), + }; +} + /** @internal */ export function buildKtxColumnDescriptionPrompt( input: KtxColumnDescriptionPromptInput & { maxWords?: number }, @@ -582,6 +682,156 @@ export class KtxDescriptionGenerator { } } + async generateBatchedTableDescriptions( + input: KtxGenerateBatchedTableDescriptionsInput, + ): Promise { + const tableRef = toTableRef(input.table); + let sampleData: KtxTableSampleResult | null = null; + let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null; + if (!input.connector.sampleTable) { + fallbackReason = 'capability_missing'; + this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'connector_capability_missing', + message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, capability: 'sampleTable' }, + }); + } else { + try { + sampleData = await retryAsync( + () => + input.connector.sampleTable!( + { + connectionId: input.connectionId, + table: tableRef, + limit: 20, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn(`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + attempt, + }); + }, + }, + ); + if (sampleData.rows.length === 0) { + fallbackReason = 'empty_sample'; + this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + } + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + fallbackReason = 'sampling_failed'; + this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'sampling_failed', + message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, error: errorMessage(error) }, + }); + } + } + + const sampleValues = sampleValuesByColumn(input.table.columns, sampleData); + const descriptions = new Map(); + let tableDescription: string | null = null; + let structuredGenerationSucceeded = false; + + try { + const prompt = batchedPrompt({ + table: input.table, + sampleData, + dataSourceType: input.dataSourceType, + tableMaxWords: this.settings.tableMaxWords, + columnMaxWords: this.settings.columnMaxWords, + }); + const generated = await this.llmRuntime.generateObject< + BatchedTableDescriptionOutput, + typeof batchedTableDescriptionSchema + >({ + role: 'candidateExtraction', + system: prompt.system, + prompt: prompt.user, + schema: batchedTableDescriptionSchema, + temperature: this.settings.temperature, + }); + structuredGenerationSucceeded = true; + tableDescription = generated.tableDescription.trim() || null; + const generatedColumns = new Map( + generated.columns.map((column) => [column.name.toLowerCase(), column.description.trim() || null]), + ); + for (const column of input.table.columns) { + const description = generatedColumns.get(column.name.toLowerCase()) ?? null; + descriptions.set(column.name, description); + } + if (tableDescription && fallbackReason !== null) { + this.onWarning?.({ + code: 'description_fallback_used', + message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, reason: fallbackReason }, + }); + } + } catch (error) { + this.logger?.warn(`Batched table description failed for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate batched description for table ${input.table.name}: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id }, + }); + } + + if (!structuredGenerationSucceeded) { + for (const column of input.table.columns) { + descriptions.set(column.name, null); + } + return { tableDescription, columnDescriptions: descriptions }; + } + + const tableContext = `Table: ${input.table.name} | Columns: ${input.table.columns.map((column) => column.name).join(', ')} | Data source: ${input.dataSourceType}`; + for (const column of input.table.columns) { + if (descriptions.get(column.name)) { + continue; + } + const fallback = await this.generateColumnDescriptionFromPreparedValues({ + column, + columnValues: sampleValues.get(column.name) ?? [], + tableContext, + dataSourceType: input.dataSourceType, + supportsNestedAnalysis: input.supportsNestedAnalysis, + }); + descriptions.set(column.name, fallback); + } + + return { tableDescription, columnDescriptions: descriptions }; + } + async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise { if (input.tables.length === 0) { return 'No tables found in database'; @@ -732,27 +982,13 @@ export class KtxDescriptionGenerator { } } - const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined); - const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0; - if (nonNullValues.length === 0 && !hasRawDescriptions) { - return { - columnName: column.name, - description: null, - skipped: false, - processed: false, - }; - } - - const prompt = buildKtxColumnDescriptionPrompt({ - columnName: column.name, - columnValues: nonNullValues, + const description = await this.generateColumnDescriptionFromPreparedValues({ + column, + columnValues: columnValues ?? [], tableContext, dataSourceType: input.dataSourceType, supportsNestedAnalysis: input.supportsNestedAnalysis, - rawDescriptions: column.rawDescriptions, - maxWords: this.settings.columnMaxWords, }); - const description = await this.generateAiDescription(prompt, 'ktx-column-description'); if (cacheKey && description) { await this.cache?.set(cacheKey, description); @@ -782,6 +1018,30 @@ export class KtxDescriptionGenerator { } } + private async generateColumnDescriptionFromPreparedValues(input: { + column: KtxDescriptionColumn; + columnValues: unknown[]; + tableContext: string; + dataSourceType: string; + supportsNestedAnalysis: boolean; + }): Promise { + const nonNullValues = input.columnValues.filter((value) => value !== null && value !== undefined); + const hasRawDescriptions = descriptionSources(input.column.rawDescriptions).length > 0; + if (nonNullValues.length === 0 && !hasRawDescriptions) { + return null; + } + const prompt = buildKtxColumnDescriptionPrompt({ + columnName: input.column.name, + columnValues: nonNullValues, + tableContext: input.tableContext, + dataSourceType: input.dataSourceType, + supportsNestedAnalysis: input.supportsNestedAnalysis, + rawDescriptions: input.column.rawDescriptions, + maxWords: this.settings.columnMaxWords, + }); + return this.generateAiDescription(prompt, 'ktx-column-description'); + } + private async generateAiDescription(prompt: KtxDescriptionPrompt, _operationName: string): Promise { try { const text = await this.llmRuntime.generateText({ diff --git a/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts b/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts index 56994568..8a49fc78 100644 --- a/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts +++ b/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts @@ -289,6 +289,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 12, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, }, }); @@ -378,6 +379,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { validationRequiredForManifest: true, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, }, profileWarnings: [], @@ -472,6 +474,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, dryRun: false, @@ -741,6 +744,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, dryRun: false, diff --git a/packages/cli/src/context/scan/local-enrichment-artifacts.ts b/packages/cli/src/context/scan/local-enrichment-artifacts.ts index 3a2d15f6..9de46a98 100644 --- a/packages/cli/src/context/scan/local-enrichment-artifacts.ts +++ b/packages/cli/src/context/scan/local-enrichment-artifacts.ts @@ -382,6 +382,7 @@ export async function writeLocalScanEnrichmentArtifacts( validationRequiredForManifest: input.relationshipSettings.validationRequiredForManifest, maxCandidatesPerColumn: input.relationshipSettings.maxCandidatesPerColumn, profileSampleRows: input.relationshipSettings.profileSampleRows, + profileConcurrency: input.relationshipSettings.profileConcurrency, validationConcurrency: input.relationshipSettings.validationConcurrency, } : undefined, diff --git a/packages/cli/src/context/scan/local-enrichment.test.ts b/packages/cli/src/context/scan/local-enrichment.test.ts index 66b66fc2..9647c8b9 100644 --- a/packages/cli/src/context/scan/local-enrichment.test.ts +++ b/packages/cli/src/context/scan/local-enrichment.test.ts @@ -299,6 +299,38 @@ describe('local scan enrichment', () => { ]); }); + it('uses the supplied snapshot without calling connector.introspect', async () => { + const scanConnector = connector(); + const introspect = vi.mocked(scanConnector.introspect); + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'structural', + connector: scanConnector, + snapshot, + context: { runId: 'scan-run-snapshot' }, + providers: null, + }); + + expect(result.snapshot).toEqual(snapshot); + expect(introspect).not.toHaveBeenCalled(); + }); + + it('falls back to connector.introspect when no snapshot is supplied', async () => { + const scanConnector = connector(); + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'structural', + connector: scanConnector, + context: { runId: 'scan-run-introspect' }, + providers: null, + }); + + expect(result.snapshot).toEqual(snapshot); + expect(scanConnector.introspect).toHaveBeenCalledTimes(1); + }); + it('runs deterministic relationship detection for relationship scans', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', @@ -473,7 +505,7 @@ describe('local scan enrichment', () => { expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); }); - it('generates table descriptions with bounded table-level concurrency', async () => { + it('generates batched table descriptions with bounded table-level concurrency', async () => { const concurrentSnapshot: KtxSchemaSnapshot = { ...snapshot, tables: Array.from({ length: 8 }, (_, index) => ({ @@ -497,27 +529,27 @@ describe('local scan enrichment', () => { ], })), }; - let activeColumnSamples = 0; - let maxActiveColumnSamples = 0; + let activeTableSamples = 0; + let maxActiveTableSamples = 0; const scanConnector = { ...connector(), introspect: vi.fn(async () => concurrentSnapshot), - sampleColumn: vi.fn(async () => { - activeColumnSamples += 1; - maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples); + sampleColumn: vi.fn(async () => ({ + values: ['1'], + nullCount: 0, + distinctCount: 1, + })), + sampleTable: vi.fn(async () => { + activeTableSamples += 1; + maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples); await new Promise((resolve) => setTimeout(resolve, 10)); - activeColumnSamples -= 1; + activeTableSamples -= 1; return { - values: ['1'], - nullCount: 0, - distinctCount: 1, + headers: ['id'], + rows: [[1]], + totalRows: 1, }; }), - sampleTable: vi.fn(async () => ({ - headers: ['id'], - rows: [[1]], - totalRows: 1, - })), }; const settings = { ...buildDefaultKtxProjectConfig().scan.relationships, @@ -533,7 +565,8 @@ describe('local scan enrichment', () => { relationshipSettings: settings, }); - expect(maxActiveColumnSamples).toBe(6); + expect(maxActiveTableSamples).toBe(4); + expect(scanConnector.sampleColumn).not.toHaveBeenCalled(); }); it('reports enrichment progress for countable stages', async () => { @@ -675,7 +708,7 @@ describe('local scan enrichment', () => { providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); - const generateText = vi.spyOn(providers.llmRuntime, 'generateText'); + const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const embedBatch = vi.spyOn(providers.embedding, 'embedBatch'); const second = await runLocalScanEnrichment({ connectionId: 'warehouse', @@ -693,7 +726,7 @@ describe('local scan enrichment', () => { expect(first.state.resumedStages).toEqual([]); expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); - expect(generateText).not.toHaveBeenCalled(); + expect(generateObject).not.toHaveBeenCalled(); expect(embedBatch).not.toHaveBeenCalled(); expect(second.descriptionUpdates).toEqual(first.descriptionUpdates); expect(second.embeddingUpdates).toEqual(first.embeddingUpdates); @@ -731,7 +764,7 @@ describe('local scan enrichment', () => { tables: [{ ...firstTable, name: 'customers' }], })), }; - const generateText = vi.spyOn(providers.llmRuntime, 'generateText'); + const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', @@ -747,7 +780,7 @@ describe('local scan enrichment', () => { expect(result.state.resumedStages).toEqual([]); expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); - expect(generateText).toHaveBeenCalled(); + expect(generateObject).toHaveBeenCalled(); }); it('runs providerless enriched scans as relationship-only discovery enrichment', async () => { diff --git a/packages/cli/src/context/scan/local-enrichment.ts b/packages/cli/src/context/scan/local-enrichment.ts index 680f8f60..545b2ad6 100644 --- a/packages/cli/src/context/scan/local-enrichment.ts +++ b/packages/cli/src/context/scan/local-enrichment.ts @@ -1,7 +1,7 @@ import pLimit from 'p-limit'; import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js'; -import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js'; +import { KtxDescriptionGenerator } from './description-generation.js'; import { buildKtxColumnEmbeddingText } from './embedding-text.js'; import { completedKtxScanEnrichmentStateSummary, @@ -41,7 +41,7 @@ import type { KtxTableRef, } from './types.js'; -const DESCRIPTION_TABLE_CONCURRENCY = 6; +const DESCRIPTION_TABLE_CONCURRENCY = 4; export interface KtxLocalScanEnrichmentProviders { llmRuntime: KtxLlmRuntimePort; @@ -53,6 +53,7 @@ export interface KtxLocalScanEnrichmentInput { mode: KtxScanMode; detectRelationships?: boolean; connector: KtxScanConnector; + snapshot?: KtxSchemaSnapshot; context: KtxScanContext; providers: KtxLocalScanEnrichmentProviders | null; stateStore?: KtxScanEnrichmentStateStore | null; @@ -179,7 +180,17 @@ function deterministicLlmRuntime(): KtxLlmRuntimePort { async generateText(input) { return `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'data source'}`; }, - async generateObject() { + async generateObject(input) { + if (input.prompt.includes('Sample rows:')) { + const columns = Array.from(input.prompt.matchAll(/^- ([^\s(]+)/gm), (match) => ({ + name: match[1] ?? 'column', + description: `Deterministic description for ${match[1] ?? 'column'}`, + })); + return { + tableDescription: `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'table'}`, + columns, + } as never; + } return { pkCandidates: [], fkCandidates: [] } as never; }, async runAgentLoop() { @@ -234,30 +245,6 @@ export function snapshotToKtxEnrichedSchema( }; } -function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable { - return { - catalog: table.catalog, - db: table.db, - name: table.name, - columns: table.columns.map((column) => ({ - name: column.name, - ...(column.comment ? { sampleValues: [column.comment], rawDescriptions: { db: column.comment } } : {}), - })), - }; -} - -function tableMetadataColumns(table: KtxSchemaTable): Array<{ - name: string; - nativeType?: string | null; - comment?: string | null; -}> { - return table.columns.map((column) => ({ - name: column.name, - nativeType: column.nativeType ?? null, - comment: column.comment ?? null, - })); -} - function embeddingBatchSize(maxBatchSize: number): number { return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100; } @@ -306,32 +293,28 @@ async function generateDescriptions(input: { transient: true, }, ); - const tableInput = descriptionTable(table); - const columnResult = await generator.generateColumnDescriptions({ + const batched = await generator.generateBatchedTableDescriptions({ connectionId: input.snapshot.connectionId, connector: input.connector, context: input.context, dataSourceType: input.snapshot.driver, supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, - table: tableInput, - }); - const tableDescription = await generator.generateTableDescription({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, table: { catalog: table.catalog, db: table.db, name: table.name, rawDescriptions: table.comment ? { db: table.comment } : {}, - columns: tableMetadataColumns(table), + columns: table.columns.map((column) => ({ + name: column.name, + type: column.nativeType, + ...(column.comment ? { rawDescriptions: { db: column.comment } } : {}), + })), }, }); return { table: tableRef(table), - tableDescription, - columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), + tableDescription: batched.tableDescription, + columnDescriptions: Object.fromEntries(batched.columnDescriptions), }; }), ), @@ -472,15 +455,17 @@ export async function runLocalScanEnrichment( ): Promise { const progress = input.context.progress; await progress?.update(0, 'Loading enrichment schema snapshot'); - const snapshot = await input.connector.introspect( - { - connectionId: input.connectionId, - driver: input.connector.driver, - mode: input.mode, - detectRelationships: input.detectRelationships, - }, - input.context, - ); + const snapshot = + input.snapshot ?? + (await input.connector.introspect( + { + connectionId: input.connectionId, + driver: input.connector.driver, + mode: input.mode, + detectRelationships: input.detectRelationships, + }, + input.context, + )); await progress?.update(0.05, `Loaded schema snapshot with ${snapshot.tables.length} tables`); const now = input.now ?? (() => new Date()); diff --git a/packages/cli/src/context/scan/local-scan.test.ts b/packages/cli/src/context/scan/local-scan.test.ts index 0c397322..7b5af5b0 100644 --- a/packages/cli/src/context/scan/local-scan.test.ts +++ b/packages/cli/src/context/scan/local-scan.test.ts @@ -126,7 +126,43 @@ async function writeDatabaseConfigWithoutIngestAdapters(projectDir: string): Pro ); } -function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceAdapter { +function defaultFetchSnapshot(options: { extractedAt?: () => string } = {}): KtxSchemaSnapshot { + return { + connectionId: 'warehouse', + driver: 'postgres', + extractedAt: options.extractedAt?.() ?? '2026-04-29T09:00:00.000Z', + scope: { schemas: ['public'] }, + metadata: {}, + tables: [ + { + name: 'orders', + catalog: null, + db: 'public', + kind: 'table', + comment: null, + estimatedRows: null, + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: null, + }, + ], + foreignKeys: [], + }, + ], + }; +} + +function fetchOnlyAdapter(options: { extractedAt?: () => string; snapshot?: KtxSchemaSnapshot } = {}): SourceAdapter { + const scanSnapshot = options.snapshot + ? { ...options.snapshot, ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}) } + : defaultFetchSnapshot(options); + return { source: 'live-database', skillNames: ['live_database_ingest'], @@ -135,39 +171,89 @@ function fetchOnlyAdapter(options: { extractedAt?: () => string } = {}): SourceA await writeFile( join(stagedDir, 'connection.json'), `${JSON.stringify({ - connectionId: 'warehouse', - driver: 'postgres', - ...(options.extractedAt ? { extractedAt: options.extractedAt() } : {}), - scope: { schemas: ['public'] }, - metadata: {}, + connectionId: scanSnapshot.connectionId, + driver: scanSnapshot.driver, + extractedAt: scanSnapshot.extractedAt, + scope: scanSnapshot.scope, + metadata: scanSnapshot.metadata, })}\n`, 'utf-8', ); await writeFile(join(stagedDir, 'foreign-keys.json'), '{"foreignKeys":[]}\n', 'utf-8'); - await writeFile( - join(stagedDir, 'tables', 'orders.json'), - '{"name":"orders","catalog":null,"db":"public","kind":"table","comment":null,"estimatedRows":null,"columns":[{"name":"id","nativeType":"integer","normalizedType":"integer","dimensionType":"number","nullable":false,"primaryKey":true,"comment":null}],"foreignKeys":[]}\n', - 'utf-8', - ); + for (const table of scanSnapshot.tables) { + await writeFile(join(stagedDir, 'tables', `${table.name}.json`), `${JSON.stringify(table)}\n`, 'utf-8'); + } }, async detect() { return true; }, async chunk() { return { - workUnits: [ - { - unitKey: 'live-database-public-orders', - rawFiles: ['tables/orders.json'], - dependencyPaths: ['connection.json', 'foreign-keys.json'], - peerFileIndex: [], - }, - ], + workUnits: scanSnapshot.tables.map((table) => ({ + unitKey: `live-database-${table.db ?? 'default'}-${table.name}`, + rawFiles: [`tables/${table.name}.json`], + dependencyPaths: ['connection.json', 'foreign-keys.json'], + peerFileIndex: [], + })), }; }, }; } +function nativeScanSnapshot(): KtxSchemaSnapshot { + return { + connectionId: 'warehouse', + driver: 'postgres', + extractedAt: '2026-04-29T09:00:00.000Z', + scope: { schemas: ['public'] }, + metadata: {}, + tables: [ + { + catalog: null, + db: 'public', + name: 'orders', + kind: 'table', + comment: 'Orders', + estimatedRows: 1, + foreignKeys: [], + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number', + nullable: false, + primaryKey: true, + comment: 'Order id', + }, + ], + }, + ], + }; +} + +function nativeScanConnector(options: { cleanup?: () => Promise } = {}): KtxScanConnector { + return { + id: 'test:warehouse', + driver: 'postgres', + capabilities: { + structuralIntrospection: true, + tableSampling: true, + columnSampling: true, + columnStats: false, + readOnlySql: false, + nestedAnalysis: false, + eventStreamDiscovery: false, + formalForeignKeys: false, + estimatedRowCounts: false, + }, + introspect: vi.fn(async () => nativeScanSnapshot()), + sampleTable: vi.fn(async () => ({ headers: ['id'], rows: [[1]], totalRows: 1 })), + sampleColumn: vi.fn(async () => ({ values: ['1'], nullCount: 0, distinctCount: 1 })), + ...(options.cleanup ? { cleanup: options.cleanup } : {}), + }; +} + describe('local scan', () => { let tempDir: string; let project: KtxLocalProject; @@ -338,6 +424,59 @@ describe('local scan', () => { }); }); + it('threads the structural snapshot into enrichment without connector re-introspection', async () => { + project.config.scan.enrichment = { mode: 'deterministic' }; + const connector = nativeScanConnector(); + const introspect = vi.mocked(connector.introspect); + + const result = await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'enriched', + connector, + jobId: 'scan-enrichment-snapshot-threading', + now: () => new Date('2026-04-29T09:11:00.000Z'), + }); + + expect(result.report.enrichment.tableDescriptions).toBe('completed'); + expect(introspect).not.toHaveBeenCalled(); + }); + + it('cleans up a scan connector constructed by local scan', async () => { + const cleanup = vi.fn(async () => undefined); + + await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + createConnector: vi.fn(async () => nativeScanConnector({ cleanup })), + jobId: 'scan-owned-connector-cleanup', + now: () => new Date('2026-04-29T09:13:00.000Z'), + }); + + expect(cleanup).toHaveBeenCalledTimes(1); + }); + + it('does not clean up a caller-supplied scan connector', async () => { + const cleanup = vi.fn(async () => undefined); + + await runLocalScan({ + project, + adapters: [fetchOnlyAdapter()], + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + connector: nativeScanConnector({ cleanup }), + jobId: 'scan-supplied-connector-cleanup', + now: () => new Date('2026-04-29T09:13:30.000Z'), + }); + + expect(cleanup).not.toHaveBeenCalled(); + }); + it('reuses scan report and raw-source paths when the same local scan run id is retried', async () => { const first = await runLocalScan({ project, @@ -520,10 +659,11 @@ describe('local scan', () => { }; }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -607,10 +747,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -837,10 +978,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -946,10 +1088,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -1072,10 +1215,11 @@ describe('local scan', () => { return relationshipSqlResult(input); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, @@ -1202,10 +1346,11 @@ describe('local scan', () => { return relationshipSqlResult(input, { throwOnCoverage: true }); }, }; + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const result = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'relationships', detectRelationships: true, @@ -1337,7 +1482,8 @@ describe('local scan', () => { join(project.projectDir, 'semantic-layer/warehouse/_schema/public.yaml'), 'utf-8', ); - expect(manifestRaw).toContain('ai: "Deterministic description'); + expect(manifestRaw).toContain('ai: |-'); + expect(manifestRaw).toContain('Deterministic description'); }); it('persists structural artifacts and a recoverable warning when standalone enrichment execution fails', async () => { @@ -1510,10 +1656,11 @@ describe('local scan', () => { }, }; const llmRuntime = deterministicLlmRuntime(); + const adapter = fetchOnlyAdapter({ snapshot: await connector.introspect() }); const first = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, @@ -1542,7 +1689,7 @@ describe('local scan', () => { const generateObject = vi.spyOn(llmRuntime, 'generateObject'); const retry = await runLocalScan({ project, - adapters: [fetchOnlyAdapter()], + adapters: [adapter], connectionId: 'warehouse', mode: 'enriched', connector, @@ -1568,7 +1715,6 @@ describe('local scan', () => { failedStages: [], }); expect(retry.report.enrichment.embeddings).toBe('completed'); - expect(generateObject).toHaveBeenCalledTimes(1); expect(generateObject).toHaveBeenCalledWith(expect.objectContaining({ role: 'candidateExtraction' })); expect(embeddingAttempts).toBe(2); diff --git a/packages/cli/src/context/scan/local-scan.ts b/packages/cli/src/context/scan/local-scan.ts index 9a06dabe..cb886991 100644 --- a/packages/cli/src/context/scan/local-scan.ts +++ b/packages/cli/src/context/scan/local-scan.ts @@ -30,6 +30,7 @@ import type { KtxScanReport, KtxScanTrigger, KtxScanWarning, + KtxSchemaSnapshot, } from './types.js'; function enrichmentResolutionWarning( @@ -388,6 +389,9 @@ export async function runLocalScan(options: RunLocalScanOptions): Promise { scaleExecutor.close(); } }); + + it('profiles tables concurrently up to profileConcurrency', async () => { + let inFlight = 0; + let maxInFlight = 0; + const executor = { + executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => setTimeout(resolve, 10)); + inFlight -= 1; + return { + headers: [ + 'column_name', + 'table_row_count', + 'row_count', + 'null_count', + 'distinct_count', + 'min_text_length', + 'max_text_length', + 'sample_values', + ], + rows: [[input.sql.includes('accounts') ? 'id' : 'account_id', 2, 2, 0, 2, 1, 2, '1\u001f2']], + totalRows: 1, + rowCount: 1, + }; + }), + }; + + await profileKtxRelationshipSchema({ + connectionId: 'warehouse', + driver: 'sqlite', + schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']), + executor, + ctx: { runId: 'profile-concurrency' }, + profileConcurrency: 4, + }); + + expect(maxInFlight).toBe(4); + }); + + it('keeps profiling other tables when one table profile fails', async () => { + const executor = { + executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => { + if (input.sql.includes('"orders"')) { + throw new Error('orders unavailable'); + } + return { + headers: [ + 'column_name', + 'table_row_count', + 'row_count', + 'null_count', + 'distinct_count', + 'min_text_length', + 'max_text_length', + 'sample_values', + ], + rows: [['id', 2, 2, 0, 2, 1, 2, '1\u001f2']], + totalRows: 1, + rowCount: 1, + }; + }), + }; + + const result = await profileKtxRelationshipSchema({ + connectionId: 'warehouse', + driver: 'sqlite', + schema: schemaWithTables(['accounts', 'orders']), + executor, + ctx: { runId: 'profile-error-isolated' }, + profileConcurrency: 2, + }); + + expect(result.warnings).toContain('profile_failed:orders:orders unavailable'); + expect(result.tables).toHaveLength(2); + expect(Object.keys(result.columns)).toContain('accounts.id'); + }); }); + +function schemaWithTables(names: string[]): KtxEnrichedSchema { + return schema( + names.map((name) => + table(name, [ + column(name, name === 'orders' ? 'account_id' : 'id', { + nullable: false, + primaryKey: name !== 'orders', + }), + ]), + ), + ); +} diff --git a/packages/cli/src/context/scan/relationship-profiling.ts b/packages/cli/src/context/scan/relationship-profiling.ts index 77f1c38d..1824d263 100644 --- a/packages/cli/src/context/scan/relationship-profiling.ts +++ b/packages/cli/src/context/scan/relationship-profiling.ts @@ -1,4 +1,5 @@ import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js'; +import { mapWithConcurrency } from './relationship-validation.js'; import type { KtxConnectionDriver, KtxQueryResult, @@ -60,6 +61,7 @@ export interface ProfileKtxRelationshipSchemaInput { ctx: KtxScanContext; sampleValuesPerColumn?: number; profileSampleRows?: number; + profileConcurrency?: number; cache?: KtxRelationshipProfileCache; } @@ -389,6 +391,10 @@ async function queryTableProfile(input: { }; } +type TableProfileResult = + | { tableProfile: Awaited> } + | { cached: KtxRelationshipCachedTableProfile; queryCount: 0 }; + export async function profileKtxRelationshipSchema( input: ProfileKtxRelationshipSchemaInput, ): Promise { @@ -408,54 +414,68 @@ export async function profileKtxRelationshipSchema( const tables: KtxRelationshipTableProfile[] = []; const columns: Record = {}; const warnings: string[] = []; + const executor = input.executor; - for (const table of input.schema.tables.filter((candidate) => candidate.enabled)) { - const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5; - const profileSampleRows = input.profileSampleRows ?? 10000; - const cacheKey = tableProfileCacheKey({ - connectionId: input.connectionId, - driver: input.driver, - ctx: input.ctx, - table: table.ref, - sampleValuesPerColumn, - profileSampleRows, - }); - const cached = input.cache?.tableProfiles.get(cacheKey); - if (cached) { - tables.push(cached.table); - Object.assign(columns, cached.columns); - for (const warning of cached.warnings) { - warnings.push(warning); - } - continue; - } - - try { - const tableProfile = await queryTableProfile({ + const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled); + const tableResults = await mapWithConcurrency( + enabledTables, + input.profileConcurrency ?? 4, + async (table) => { + const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5; + const profileSampleRows = input.profileSampleRows ?? 10000; + const cacheKey = tableProfileCacheKey({ connectionId: input.connectionId, driver: input.driver, - table, - executor: input.executor, ctx: input.ctx, + table: table.ref, sampleValuesPerColumn, profileSampleRows, }); - queryTotal += tableProfile.queryCount; - tables.push(tableProfile.table); - Object.assign(columns, tableProfile.columns); - input.cache?.tableProfiles.set(cacheKey, { - table: tableProfile.table, - columns: tableProfile.columns, - warnings: [], - }); - } catch (error) { - const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`; - warnings.push(failureWarning); - input.cache?.tableProfiles.set(cacheKey, { - table: { table: table.ref, rowCount: 0 }, - columns: {}, - warnings: [failureWarning], - }); + const cached = input.cache?.tableProfiles.get(cacheKey); + if (cached) { + return { cached, queryCount: 0 }; + } + + try { + const tableProfile = await queryTableProfile({ + connectionId: input.connectionId, + driver: input.driver, + table, + executor, + ctx: input.ctx, + sampleValuesPerColumn, + profileSampleRows, + }); + input.cache?.tableProfiles.set(cacheKey, { + table: tableProfile.table, + columns: tableProfile.columns, + warnings: [], + }); + return { tableProfile }; + } catch (error) { + const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`; + const cachedFailure = { + table: { table: table.ref, rowCount: 0 }, + columns: {}, + warnings: [failureWarning], + }; + input.cache?.tableProfiles.set(cacheKey, cachedFailure); + return { cached: cachedFailure, queryCount: 0 }; + } + }, + ); + + for (const result of tableResults) { + if ('tableProfile' in result) { + queryTotal += result.tableProfile.queryCount; + tables.push(result.tableProfile.table); + Object.assign(columns, result.tableProfile.columns); + continue; + } + tables.push(result.cached.table); + Object.assign(columns, result.cached.columns); + for (const warning of result.cached.warnings) { + warnings.push(warning); } } diff --git a/packages/cli/src/context/scan/relationship-validation.ts b/packages/cli/src/context/scan/relationship-validation.ts index 63d7328a..685d1ea9 100644 --- a/packages/cli/src/context/scan/relationship-validation.ts +++ b/packages/cli/src/context/scan/relationship-validation.ts @@ -193,7 +193,7 @@ function statusFor(input: { return 'rejected'; } -async function mapWithConcurrency( +export async function mapWithConcurrency( inputs: readonly TInput[], concurrency: number, mapOne: (input: TInput) => Promise, diff --git a/packages/cli/src/scan.test.ts b/packages/cli/src/scan.test.ts index 16cfdbd3..5ec745e6 100644 --- a/packages/cli/src/scan.test.ts +++ b/packages/cli/src/scan.test.ts @@ -96,14 +96,17 @@ const createSnowflakeLiveDatabaseIntrospection = vi.hoisted(() => const isKtxSnowflakeConnectionConfig = vi.hoisted(() => vi.fn((connection: { driver?: string } | undefined) => connection?.driver === 'snowflake'), ); +const snowflakeConnectorInstances = vi.hoisted(() => [] as Array<{ cleanup: ReturnType }>); const KtxSnowflakeScanConnector = vi.hoisted( () => class { readonly id: string; readonly driver = 'snowflake'; + readonly cleanup = vi.fn(async () => undefined); constructor(options: { connectionId: string }) { this.id = `snowflake:${options.connectionId}`; + snowflakeConnectorInstances.push(this); } }, ); @@ -1047,6 +1050,95 @@ describe('runKtxScan', () => { await rm(tempProject, { recursive: true, force: true }); }); + it('cleans up a constructed scan connector after an enriched scan succeeds', async () => { + await initKtxProject({ projectDir: tempDir }); + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'connections:', + ' warehouse:', + ' driver: snowflake', + ' account: acct', + ' warehouse: WH', + ' database: ANALYTICS', + ' schema_name: PUBLIC', + ' username: reader', + ' password: env:SNOWFLAKE_PASSWORD', + '', + ].join('\n'), + 'utf-8', + ); + snowflakeConnectorInstances.length = 0; + const runLocalScan = vi.fn(async (): Promise => ({ + runId: 'scan-run-cleanup', + status: 'done', + done: true, + connectionId: 'warehouse', + mode: 'enriched', + dryRun: false, + syncId: 'sync-1', + report: { ...report, mode: 'enriched' }, + })); + + await expect( + runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'enriched', + detectRelationships: false, + dryRun: false, + }, + makeIo().io, + { runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters }, + ), + ).resolves.toBe(0); + + expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1); + }); + + it('cleans up a constructed scan connector after runLocalScan throws', async () => { + await initKtxProject({ projectDir: tempDir }); + await writeFile( + join(tempDir, 'ktx.yaml'), + [ + 'connections:', + ' warehouse:', + ' driver: snowflake', + ' account: acct', + ' warehouse: WH', + ' database: ANALYTICS', + ' schema_name: PUBLIC', + ' username: reader', + ' password: env:SNOWFLAKE_PASSWORD', + '', + ].join('\n'), + 'utf-8', + ); + snowflakeConnectorInstances.length = 0; + const runLocalScan = vi.fn(async () => { + throw new Error('scan failed'); + }); + + await expect( + runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'relationships', + detectRelationships: true, + dryRun: false, + }, + makeIo().io, + { runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters }, + ), + ).resolves.toBe(1); + + expect(snowflakeConnectorInstances[0]?.cleanup).toHaveBeenCalledTimes(1); + }); + it('routes standalone postgres scans through the native connector before daemon fallback', async () => { const tempProject = await mkdtemp(join(tmpdir(), 'ktx-scan-cli-native-postgres-')); await initKtxProject({ projectDir: tempProject }); diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index f40da497..94b80f65 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -375,6 +375,7 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps writeRunSummary(result.report, args.projectDir, io); } finally { cliProgress?.flush(); + await connector?.cleanup?.(); } return 0; } catch (error) { diff --git a/packages/cli/src/setup-databases.test.ts b/packages/cli/src/setup-databases.test.ts index c8156b9d..50a1c6ed 100644 --- a/packages/cli/src/setup-databases.test.ts +++ b/packages/cli/src/setup-databases.test.ts @@ -545,7 +545,7 @@ describe('setup databases step', () => { }, { driver: 'snowflake', - selectValues: ['no'], + selectValues: ['password', 'no'], textValues: ['', 'env:SNOWFLAKE_ACCOUNT', 'ANALYTICS_WH', 'ANALYTICS', 'env:SNOWFLAKE_USER', ''], passwordValues: ['env:SNOWFLAKE_PASSWORD'], expectedTextPrompts: [ @@ -2090,6 +2090,7 @@ describe('setup databases step', () => { scanConnection: vi.fn(async () => 0), historicSqlProbe, prompts: makePromptAdapter({ + selectValues: ['password'], textValues: ['env:SNOWFLAKE_ACCOUNT', 'WH', 'ANALYTICS', 'reader', ''], passwordValues: ['env:SNOWFLAKE_PASSWORD'], }), @@ -2131,6 +2132,51 @@ describe('setup databases step', () => { expect(config.ingest.adapters).toEqual([]); }); + it('configures Snowflake with RSA key-pair auth via setup wizard', async () => { + const io = makeIo(); + const result = await runKtxSetupDatabasesStep( + { + projectDir: tempDir, + inputMode: 'disabled', + databaseDrivers: ['snowflake'], + databaseConnectionId: 'snowflake', + databaseSchemas: [], + skipDatabases: false, + }, + io.io, + { + testConnection: vi.fn(async () => 0), + scanConnection: vi.fn(async () => 0), + prompts: makePromptAdapter({ + selectValues: ['rsa'], + textValues: [ + 'env:SNOWFLAKE_ACCOUNT', + 'WH', + 'ANALYTICS', + 'reader', + '~/.ssh/snowflake_rsa_key.p8', + '', + ], + passwordValues: ['env:SNOWFLAKE_KEY_PASS'], + }), + }, + ); + + expect(result.status).toBe('ready'); + const config = parseKtxProjectConfig(await readFile(join(tempDir, 'ktx.yaml'), 'utf-8')); + expect(config.connections.snowflake).toMatchObject({ + driver: 'snowflake', + authMethod: 'rsa', + account: 'env:SNOWFLAKE_ACCOUNT', + warehouse: 'WH', + database: 'ANALYTICS', + username: 'reader', + privateKey: 'file:~/.ssh/snowflake_rsa_key.p8', // pragma: allowlist secret + passphrase: 'env:SNOWFLAKE_KEY_PASS', // pragma: allowlist secret + }); + expect(config.connections.snowflake.password).toBeUndefined(); + }); + it('writes Postgres query history config with minExecutions and ignores window/redaction output', async () => { const io = makeIo(); const result = await runKtxSetupDatabasesStep( diff --git a/packages/cli/src/setup-databases.ts b/packages/cli/src/setup-databases.ts index c82846ff..30d4fa20 100644 --- a/packages/cli/src/setup-databases.ts +++ b/packages/cli/src/setup-databases.ts @@ -1015,30 +1015,80 @@ async function buildConnectionConfig(input: { stringConfigField(input.existingConnection, 'username'), ); if (username === undefined) return 'back'; - const passwordRef = await promptCredential({ - prompts, - message: 'Snowflake password', - projectDir: args.projectDir, - connectionId: input.connectionId, - secretName: 'password', // pragma: allowlist secret + const authChoice = await prompts.select({ + message: 'Snowflake authentication method', + options: [ + { value: 'password', label: 'Password' }, + { value: 'rsa', label: 'Key-pair (RSA / JWT)' }, + { value: 'back', label: 'Back' }, + ], }); - if (passwordRef === 'back') return 'back'; // pragma: allowlist secret + if (authChoice === 'back') return 'back'; + const authMethod: 'password' | 'rsa' = authChoice === 'rsa' ? 'rsa' : 'password'; + let passwordRef: string | null = null; + let privateKeyInput: string | undefined; + let passphraseRef: string | null = null; + if (authMethod === 'password') { + const ref = await promptCredential({ + prompts, + message: 'Snowflake password', + projectDir: args.projectDir, + connectionId: input.connectionId, + secretName: 'password', // pragma: allowlist secret + }); + if (ref === 'back') return 'back'; // pragma: allowlist secret + passwordRef = ref; + } else { + privateKeyInput = await promptText( + prompts, + 'Path to Snowflake private key (PEM)\nFor example ~/.ssh/snowflake_rsa_key.p8, or $ENV_VAR / env:NAME / file:/abs/path.', + displayFileReference(stringConfigField(input.existingConnection, 'privateKey')), + ); + if (privateKeyInput === undefined) return 'back'; + const phr = await promptCredential({ + prompts, + message: 'Private key passphrase (optional)\nPress Enter to skip.', + projectDir: args.projectDir, + connectionId: input.connectionId, + secretName: 'snowflake-passphrase', // pragma: allowlist secret + }); + if (phr === 'back') return 'back'; + passphraseRef = phr; + } const role = await promptText( prompts, 'Snowflake role (optional)\nPress Enter to skip.', stringConfigField(input.existingConnection, 'role'), ); if (role === undefined) return 'back'; - const resolvedPasswordRef = passwordRef ?? stringConfigField(input.existingConnection, 'password'); - if (!account || !warehouse || !database || !username || !resolvedPasswordRef) return null; + if (authMethod === 'password') { + const resolvedPasswordRef = passwordRef ?? stringConfigField(input.existingConnection, 'password'); + if (!account || !warehouse || !database || !username || !resolvedPasswordRef) return null; + return { + driver: 'snowflake', + authMethod: 'password', + account, + warehouse, + database, + username, + password: resolvedPasswordRef, + ...(role ? { role } : {}), + }; + } + const resolvedPrivateKey = privateKeyInput + ? normalizeFileReference(privateKeyInput) + : stringConfigField(input.existingConnection, 'privateKey'); + if (!account || !warehouse || !database || !username || !resolvedPrivateKey) return null; + const resolvedPassphrase = passphraseRef ?? stringConfigField(input.existingConnection, 'passphrase'); return { driver: 'snowflake', - authMethod: 'password', + authMethod: 'rsa', account, warehouse, database, username, - password: resolvedPasswordRef, + privateKey: resolvedPrivateKey, + ...(resolvedPassphrase ? { passphrase: resolvedPassphrase } : {}), ...(role ? { role } : {}), }; } diff --git a/packages/cli/src/telemetry/project-snapshot.test.ts b/packages/cli/src/telemetry/project-snapshot.test.ts index daf4e766..a1c06472 100644 --- a/packages/cli/src/telemetry/project-snapshot.test.ts +++ b/packages/cli/src/telemetry/project-snapshot.test.ts @@ -47,6 +47,7 @@ describe('buildProjectStackSnapshotFields', () => { maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, },