From 48c0b3bb3b53be0ad801d645dae133702de314b9 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Fri, 22 May 2026 17:00:26 +0200 Subject: [PATCH] feat(scan): parallelize relationship profiling --- .../cli/src/context/project/config.test.ts | 6 ++ packages/cli/src/context/project/config.ts | 5 + .../scan/local-enrichment-artifacts.test.ts | 4 + .../scan/local-enrichment-artifacts.ts | 1 + .../context/scan/relationship-diagnostics.ts | 2 + .../context/scan/relationship-discovery.ts | 1 + .../scan/relationship-profiling.test.ts | 92 ++++++++++++++++++- .../context/scan/relationship-profiling.ts | 37 +++++--- .../context/scan/relationship-validation.ts | 2 +- 9 files changed, 135 insertions(+), 15 deletions(-) diff --git a/packages/cli/src/context/project/config.test.ts b/packages/cli/src/context/project/config.test.ts index 3b7f2feb..55188aa2 100644 --- a/packages/cli/src/context/project/config.test.ts +++ b/packages/cli/src/context/project/config.test.ts @@ -74,6 +74,7 @@ connections: maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, }, @@ -278,6 +279,7 @@ scan: maxLlmTablesPerBatch: 12 maxCandidatesPerColumn: 7 profileSampleRows: 500 + profileConcurrency: 3 validationConcurrency: 2 validationBudget: 0 `); @@ -291,6 +293,7 @@ scan: maxLlmTablesPerBatch: 12, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, validationBudget: 0, }); @@ -302,6 +305,7 @@ scan: expect(serializeKtxProjectConfig(config)).toContain('maxLlmTablesPerBatch: 12'); expect(serializeKtxProjectConfig(config)).toContain('maxCandidatesPerColumn: 7'); expect(serializeKtxProjectConfig(config)).toContain('profileSampleRows: 500'); + expect(serializeKtxProjectConfig(config)).toContain('profileConcurrency: 3'); expect(serializeKtxProjectConfig(config)).toContain('validationConcurrency: 2'); expect(serializeKtxProjectConfig(config)).toContain('validationBudget: 0'); }); @@ -326,6 +330,7 @@ scan: maxLlmTablesPerBatch: 0 maxCandidatesPerColumn: -4 profileSampleRows: 0 + profileConcurrency: 0 validationConcurrency: 0 validationBudget: 1.5 `; @@ -341,6 +346,7 @@ scan: 'scan.relationships.maxLlmTablesPerBatch', 'scan.relationships.maxCandidatesPerColumn', 'scan.relationships.profileSampleRows', + 'scan.relationships.profileConcurrency', 'scan.relationships.validationConcurrency', 'scan.relationships.validationBudget', ]), diff --git a/packages/cli/src/context/project/config.ts b/packages/cli/src/context/project/config.ts index 2824ca59..e83f502e 100644 --- a/packages/cli/src/context/project/config.ts +++ b/packages/cli/src/context/project/config.ts @@ -163,6 +163,11 @@ const scanRelationshipsSchema = z .default(25) .describe('Maximum number of candidate join partners considered per column during relationship discovery.'), profileSampleRows: z.int().positive().default(10000).describe('Number of rows sampled per table when profiling values for relationship inference.'), + profileConcurrency: z + .int() + .positive() + .default(4) + .describe('Parallel relationship-profile queries run against the database during scan.'), validationConcurrency: z.int().positive().default(4).describe('Number of relationship validation queries run in parallel against the database.'), validationBudget: z .union([z.literal('all'), z.int().nonnegative()]) diff --git a/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts b/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts index 56994568..8a49fc78 100644 --- a/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts +++ b/packages/cli/src/context/scan/local-enrichment-artifacts.test.ts @@ -289,6 +289,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 12, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, }, }); @@ -378,6 +379,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { validationRequiredForManifest: true, maxCandidatesPerColumn: 7, profileSampleRows: 500, + profileConcurrency: 3, validationConcurrency: 2, }, profileWarnings: [], @@ -472,6 +474,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, dryRun: false, @@ -741,6 +744,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => { maxLlmTablesPerBatch: 40, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }, dryRun: false, diff --git a/packages/cli/src/context/scan/local-enrichment-artifacts.ts b/packages/cli/src/context/scan/local-enrichment-artifacts.ts index 3a2d15f6..9de46a98 100644 --- a/packages/cli/src/context/scan/local-enrichment-artifacts.ts +++ b/packages/cli/src/context/scan/local-enrichment-artifacts.ts @@ -382,6 +382,7 @@ export async function writeLocalScanEnrichmentArtifacts( validationRequiredForManifest: input.relationshipSettings.validationRequiredForManifest, maxCandidatesPerColumn: input.relationshipSettings.maxCandidatesPerColumn, profileSampleRows: input.relationshipSettings.profileSampleRows, + profileConcurrency: input.relationshipSettings.profileConcurrency, validationConcurrency: input.relationshipSettings.validationConcurrency, } : undefined, diff --git a/packages/cli/src/context/scan/relationship-diagnostics.ts b/packages/cli/src/context/scan/relationship-diagnostics.ts index e50e892e..2437b21e 100644 --- a/packages/cli/src/context/scan/relationship-diagnostics.ts +++ b/packages/cli/src/context/scan/relationship-diagnostics.ts @@ -70,6 +70,7 @@ interface KtxRelationshipDiagnosticsPolicy { validationRequiredForManifest: boolean; maxCandidatesPerColumn: number; profileSampleRows: number; + profileConcurrency: number; validationConcurrency: number; } @@ -118,6 +119,7 @@ const DEFAULT_POLICY: KtxRelationshipDiagnosticsPolicy = { validationRequiredForManifest: true, maxCandidatesPerColumn: 25, profileSampleRows: 10000, + profileConcurrency: 4, validationConcurrency: 4, }; diff --git a/packages/cli/src/context/scan/relationship-discovery.ts b/packages/cli/src/context/scan/relationship-discovery.ts index c1197866..66a47395 100644 --- a/packages/cli/src/context/scan/relationship-discovery.ts +++ b/packages/cli/src/context/scan/relationship-discovery.ts @@ -228,6 +228,7 @@ export async function discoverKtxRelationships( executor, ctx: input.context, profileSampleRows: input.settings.profileSampleRows, + profileConcurrency: input.settings.profileConcurrency, cache: profileCache, }); const deterministicCandidates: KtxRelationshipDiscoveryCandidate[] = generateKtxRelationshipDiscoveryCandidates( diff --git a/packages/cli/src/context/scan/relationship-profiling.test.ts b/packages/cli/src/context/scan/relationship-profiling.test.ts index d034f1d9..76151d23 100644 --- a/packages/cli/src/context/scan/relationship-profiling.test.ts +++ b/packages/cli/src/context/scan/relationship-profiling.test.ts @@ -1,7 +1,7 @@ import { readFile } from 'node:fs/promises'; import { join } from 'node:path'; import Database from 'better-sqlite3'; -import { afterEach, describe, expect, it } from 'vitest'; +import { afterEach, describe, expect, it, vi } from 'vitest'; import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js'; import { snapshotToKtxEnrichedSchema } from './local-enrichment.js'; import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js'; @@ -351,4 +351,94 @@ describe('relationship profiling', () => { scaleExecutor.close(); } }); + + it('profiles tables concurrently up to profileConcurrency', async () => { + let inFlight = 0; + let maxInFlight = 0; + const executor = { + executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => { + inFlight += 1; + maxInFlight = Math.max(maxInFlight, inFlight); + await new Promise((resolve) => setTimeout(resolve, 10)); + inFlight -= 1; + return { + headers: [ + 'column_name', + 'table_row_count', + 'row_count', + 'null_count', + 'distinct_count', + 'min_text_length', + 'max_text_length', + 'sample_values', + ], + rows: [[input.sql.includes('accounts') ? 'id' : 'account_id', 2, 2, 0, 2, 1, 2, '1\u001f2']], + totalRows: 1, + rowCount: 1, + }; + }), + }; + + await profileKtxRelationshipSchema({ + connectionId: 'warehouse', + driver: 'sqlite', + schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']), + executor, + ctx: { runId: 'profile-concurrency' }, + profileConcurrency: 4, + }); + + expect(maxInFlight).toBe(4); + }); + + it('keeps profiling other tables when one table profile fails', async () => { + const executor = { + executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => { + if (input.sql.includes('"orders"')) { + throw new Error('orders unavailable'); + } + return { + headers: [ + 'column_name', + 'table_row_count', + 'row_count', + 'null_count', + 'distinct_count', + 'min_text_length', + 'max_text_length', + 'sample_values', + ], + rows: [['id', 2, 2, 0, 2, 1, 2, '1\u001f2']], + totalRows: 1, + rowCount: 1, + }; + }), + }; + + const result = await profileKtxRelationshipSchema({ + connectionId: 'warehouse', + driver: 'sqlite', + schema: schemaWithTables(['accounts', 'orders']), + executor, + ctx: { runId: 'profile-error-isolated' }, + profileConcurrency: 2, + }); + + expect(result.warnings).toContain('profile_failed:orders:orders unavailable'); + expect(result.tables).toHaveLength(2); + expect(Object.keys(result.columns)).toContain('accounts.id'); + }); }); + +function schemaWithTables(names: string[]): KtxEnrichedSchema { + return schema( + names.map((name) => + table(name, [ + column(name, name === 'orders' ? 'account_id' : 'id', { + nullable: false, + primaryKey: name !== 'orders', + }), + ]), + ), + ); +} diff --git a/packages/cli/src/context/scan/relationship-profiling.ts b/packages/cli/src/context/scan/relationship-profiling.ts index 2172ac24..f4f648a6 100644 --- a/packages/cli/src/context/scan/relationship-profiling.ts +++ b/packages/cli/src/context/scan/relationship-profiling.ts @@ -1,4 +1,5 @@ import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js'; +import { mapWithConcurrency } from './relationship-validation.js'; import type { KtxConnectionDriver, KtxQueryResult, @@ -60,6 +61,7 @@ export interface ProfileKtxRelationshipSchemaInput { ctx: KtxScanContext; sampleValuesPerColumn?: number; profileSampleRows?: number; + profileConcurrency?: number; cache?: KtxRelationshipProfileCache; } @@ -406,7 +408,8 @@ export async function profileKtxRelationshipSchema( const columns: Record = {}; const warnings: string[] = []; - for (const table of input.schema.tables.filter((candidate) => candidate.enabled)) { + const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled); + const tableResults = await mapWithConcurrency(enabledTables, input.profileConcurrency ?? 4, async (table) => { const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5; const profileSampleRows = input.profileSampleRows ?? 10000; const cacheKey = tableProfileCacheKey({ @@ -419,12 +422,7 @@ export async function profileKtxRelationshipSchema( }); const cached = input.cache?.tableProfiles.get(cacheKey); if (cached) { - tables.push(cached.table); - Object.assign(columns, cached.columns); - for (const warning of cached.warnings) { - warnings.push(warning); - } - continue; + return { cached, queryCount: 0 }; } try { @@ -437,22 +435,35 @@ export async function profileKtxRelationshipSchema( sampleValuesPerColumn, profileSampleRows, }); - queryTotal += tableProfile.queryCount; - tables.push(tableProfile.table); - Object.assign(columns, tableProfile.columns); input.cache?.tableProfiles.set(cacheKey, { table: tableProfile.table, columns: tableProfile.columns, warnings: [], }); + return { tableProfile }; } catch (error) { const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`; - warnings.push(failureWarning); - input.cache?.tableProfiles.set(cacheKey, { + const cachedFailure = { table: { table: table.ref, rowCount: 0 }, columns: {}, warnings: [failureWarning], - }); + }; + input.cache?.tableProfiles.set(cacheKey, cachedFailure); + return { cached: cachedFailure, queryCount: 0 }; + } + }); + + for (const result of tableResults) { + if ('tableProfile' in result) { + queryTotal += result.tableProfile.queryCount; + tables.push(result.tableProfile.table); + Object.assign(columns, result.tableProfile.columns); + continue; + } + tables.push(result.cached.table); + Object.assign(columns, result.cached.columns); + for (const warning of result.cached.warnings) { + warnings.push(warning); } } diff --git a/packages/cli/src/context/scan/relationship-validation.ts b/packages/cli/src/context/scan/relationship-validation.ts index 63d7328a..685d1ea9 100644 --- a/packages/cli/src/context/scan/relationship-validation.ts +++ b/packages/cli/src/context/scan/relationship-validation.ts @@ -193,7 +193,7 @@ function statusFor(input: { return 'rejected'; } -async function mapWithConcurrency( +export async function mapWithConcurrency( inputs: readonly TInput[], concurrency: number, mapOne: (input: TInput) => Promise,