perf: parallelize scan description generation

This commit is contained in:
Andrey Avtomonov 2026-05-12 14:34:59 +02:00
parent 22e1706907
commit 366933c755
2 changed files with 109 additions and 37 deletions

View file

@ -427,6 +427,69 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
catalog: null,
db: 'public',
name: `table_${index + 1}`,
kind: 'table' as const,
comment: null,
estimatedRows: 2,
foreignKeys: [],
columns: [
{
name: 'id',
nativeType: 'integer',
normalizedType: 'integer',
dimensionType: 'number' as const,
nullable: false,
primaryKey: true,
comment: null,
},
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig('test').scan.relationships,
enabled: false,
};
await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'enriched',
connector: scanConnector,
context: { runId: 'scan-run-concurrent-descriptions' },
providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 3 }),
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
});
it('reports enrichment progress for countable stages', async () => {
const events: Array<{ progress: number; message?: string; transient?: boolean }> = [];
const progress = {
@ -713,7 +776,7 @@ describe('local scan enrichment', () => {
model: 'provider/embedding-model',
dimensions: 1536,
batchSize: 8,
openai: { api_key: 'env:OPENAI_API_KEY' },
openai: { api_key: 'env:OPENAI_API_KEY' }, // pragma: allowlist secret
},
},
{
@ -726,7 +789,7 @@ describe('local scan enrichment', () => {
{
createKtxLlmProvider: createKtxLlmProvider as any,
createKtxEmbeddingProvider: createKtxEmbeddingProvider as any,
env: { OPENAI_API_KEY: 'openai-key' },
env: { OPENAI_API_KEY: 'openai-key' }, // pragma: allowlist secret
},
);

View file

@ -1,4 +1,5 @@
import type { KtxLlmProvider } from '@ktx/llm';
import pLimit from 'p-limit';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
@ -40,6 +41,8 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
export interface DeterministicLocalScanEnrichmentProviderOptions {
embeddingDimensions?: number;
maxBatchSize?: number;
@ -322,41 +325,47 @@ async function generateDescriptions(input: {
await input.progress?.update(1, 'No tables to describe');
return updates;
}
for (const [index, table] of input.snapshot.tables.entries()) {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
updates.push({
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
});
}
const limitTable = pLimit(DESCRIPTION_TABLE_CONCURRENCY);
const tableUpdates = await Promise.all(
input.snapshot.tables.map((table, index) =>
limitTable(async () => {
await input.progress?.update(
(index + 1) / totalTables,
`Generating descriptions ${index + 1}/${totalTables} tables`,
{
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
};
}),
),
);
updates.push(...tableUpdates);
await input.progress?.update(1, `Generated descriptions for ${totalTables} tables`);
return updates;
}