diff --git a/packages/context/src/scan/local-enrichment.test.ts b/packages/context/src/scan/local-enrichment.test.ts index c25dae61..cbed687d 100644 --- a/packages/context/src/scan/local-enrichment.test.ts +++ b/packages/context/src/scan/local-enrichment.test.ts @@ -427,6 +427,69 @@ describe('local scan enrichment', () => { expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); }); + it('generates table descriptions with bounded table-level concurrency', async () => { + const concurrentSnapshot: KtxSchemaSnapshot = { + ...snapshot, + tables: Array.from({ length: 8 }, (_, index) => ({ + catalog: null, + db: 'public', + name: `table_${index + 1}`, + kind: 'table' as const, + comment: null, + estimatedRows: 2, + foreignKeys: [], + columns: [ + { + name: 'id', + nativeType: 'integer', + normalizedType: 'integer', + dimensionType: 'number' as const, + nullable: false, + primaryKey: true, + comment: null, + }, + ], + })), + }; + let activeColumnSamples = 0; + let maxActiveColumnSamples = 0; + const scanConnector = { + ...connector(), + introspect: vi.fn(async () => concurrentSnapshot), + sampleColumn: vi.fn(async () => { + activeColumnSamples += 1; + maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples); + await new Promise((resolve) => setTimeout(resolve, 10)); + activeColumnSamples -= 1; + return { + values: ['1'], + nullCount: 0, + distinctCount: 1, + }; + }), + sampleTable: vi.fn(async () => ({ + headers: ['id'], + rows: [[1]], + totalRows: 1, + })), + }; + const settings = { + ...buildDefaultKtxProjectConfig('test').scan.relationships, + enabled: false, + }; + + await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'enriched', + connector: scanConnector, + context: { runId: 'scan-run-concurrent-descriptions' }, + providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 }), + relationshipSettings: settings, + }); + + expect(maxActiveColumnSamples).toBe(6); + }); + it('reports enrichment progress for countable stages', async () => { const events: Array<{ progress: number; message?: string; transient?: boolean }> = []; const progress = { @@ -713,7 +776,7 @@ describe('local scan enrichment', () => { model: 'provider/embedding-model', dimensions: 1536, batchSize: 8, - openai: { api_key: 'env:OPENAI_API_KEY' }, + openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret }, }, { @@ -726,7 +789,7 @@ describe('local scan enrichment', () => { { createKtxLlmProvider: createKtxLlmProvider as any, createKtxEmbeddingProvider: createKtxEmbeddingProvider as any, - env: { OPENAI_API_KEY: 'openai-key' }, + env: { OPENAI_API_KEY: 'openai-key' }, // pragma: allowlist secret }, ); diff --git a/packages/context/src/scan/local-enrichment.ts b/packages/context/src/scan/local-enrichment.ts index cefecadb..5d58e189 100644 --- a/packages/context/src/scan/local-enrichment.ts +++ b/packages/context/src/scan/local-enrichment.ts @@ -1,4 +1,5 @@ import type { KtxLlmProvider } from '@ktx/llm'; +import pLimit from 'p-limit'; import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js'; import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js'; import { buildKtxColumnEmbeddingText } from './embedding-text.js'; @@ -40,6 +41,8 @@ import type { KtxTableRef, } from './types.js'; +const DESCRIPTION_TABLE_CONCURRENCY = 6; + export interface DeterministicLocalScanEnrichmentProviderOptions { embeddingDimensions?: number; maxBatchSize?: number; @@ -322,41 +325,47 @@ async function generateDescriptions(input: { await input.progress?.update(1, 'No tables to describe'); return updates; } - for (const [index, table] of input.snapshot.tables.entries()) { - await input.progress?.update( - (index + 1) / totalTables, - `Generating descriptions ${index + 1}/${totalTables} tables`, - { - transient: true, - }, - ); - const tableInput = descriptionTable(table); - const columnResult = await generator.generateColumnDescriptions({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, - supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, - table: tableInput, - }); - const tableDescription = await generator.generateTableDescription({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, - table: { - catalog: table.catalog, - db: table.db, - name: table.name, - rawDescriptions: table.comment ? { db: table.comment } : {}, - }, - }); - updates.push({ - table: tableRef(table), - tableDescription, - columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), - }); - } + const limitTable = pLimit(DESCRIPTION_TABLE_CONCURRENCY); + const tableUpdates = await Promise.all( + input.snapshot.tables.map((table, index) => + limitTable(async () => { + await input.progress?.update( + (index + 1) / totalTables, + `Generating descriptions ${index + 1}/${totalTables} tables`, + { + transient: true, + }, + ); + const tableInput = descriptionTable(table); + const columnResult = await generator.generateColumnDescriptions({ + connectionId: input.snapshot.connectionId, + connector: input.connector, + context: input.context, + dataSourceType: input.snapshot.driver, + supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, + table: tableInput, + }); + const tableDescription = await generator.generateTableDescription({ + connectionId: input.snapshot.connectionId, + connector: input.connector, + context: input.context, + dataSourceType: input.snapshot.driver, + table: { + catalog: table.catalog, + db: table.db, + name: table.name, + rawDescriptions: table.comment ? { db: table.comment } : {}, + }, + }); + return { + table: tableRef(table), + tableDescription, + columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), + }; + }), + ), + ); + updates.push(...tableUpdates); await input.progress?.update(1, `Generated descriptions for ${totalTables} tables`); return updates; }