feat(scan): parallelize relationship profiling

This commit is contained in:
Andrey Avtomonov 2026-05-22 17:00:26 +02:00
parent 21188c7f51
commit 48c0b3bb3b
9 changed files with 135 additions and 15 deletions

View file

@ -74,6 +74,7 @@ connections:
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
},
@ -278,6 +279,7 @@ scan:
maxLlmTablesPerBatch: 12
maxCandidatesPerColumn: 7
profileSampleRows: 500
profileConcurrency: 3
validationConcurrency: 2
validationBudget: 0
`);
@ -291,6 +293,7 @@ scan:
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
validationBudget: 0,
});
@ -302,6 +305,7 @@ scan:
expect(serializeKtxProjectConfig(config)).toContain('maxLlmTablesPerBatch: 12');
expect(serializeKtxProjectConfig(config)).toContain('maxCandidatesPerColumn: 7');
expect(serializeKtxProjectConfig(config)).toContain('profileSampleRows: 500');
expect(serializeKtxProjectConfig(config)).toContain('profileConcurrency: 3');
expect(serializeKtxProjectConfig(config)).toContain('validationConcurrency: 2');
expect(serializeKtxProjectConfig(config)).toContain('validationBudget: 0');
});
@ -326,6 +330,7 @@ scan:
maxLlmTablesPerBatch: 0
maxCandidatesPerColumn: -4
profileSampleRows: 0
profileConcurrency: 0
validationConcurrency: 0
validationBudget: 1.5
`;
@ -341,6 +346,7 @@ scan:
'scan.relationships.maxLlmTablesPerBatch',
'scan.relationships.maxCandidatesPerColumn',
'scan.relationships.profileSampleRows',
'scan.relationships.profileConcurrency',
'scan.relationships.validationConcurrency',
'scan.relationships.validationBudget',
]),

View file

@ -163,6 +163,11 @@ const scanRelationshipsSchema = z
.default(25)
.describe('Maximum number of candidate join partners considered per column during relationship discovery.'),
profileSampleRows: z.int().positive().default(10000).describe('Number of rows sampled per table when profiling values for relationship inference.'),
profileConcurrency: z
.int()
.positive()
.default(4)
.describe('Parallel relationship-profile queries run against the database during scan.'),
validationConcurrency: z.int().positive().default(4).describe('Number of relationship validation queries run in parallel against the database.'),
validationBudget: z
.union([z.literal('all'), z.int().nonnegative()])

View file

@ -289,6 +289,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 12,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
});
@ -378,6 +379,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 7,
profileSampleRows: 500,
profileConcurrency: 3,
validationConcurrency: 2,
},
profileWarnings: [],
@ -472,6 +474,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,
@ -741,6 +744,7 @@ describe('writeLocalScanEnrichmentArtifacts', () => {
maxLlmTablesPerBatch: 40,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
},
dryRun: false,

View file

@ -382,6 +382,7 @@ export async function writeLocalScanEnrichmentArtifacts(
validationRequiredForManifest: input.relationshipSettings.validationRequiredForManifest,
maxCandidatesPerColumn: input.relationshipSettings.maxCandidatesPerColumn,
profileSampleRows: input.relationshipSettings.profileSampleRows,
profileConcurrency: input.relationshipSettings.profileConcurrency,
validationConcurrency: input.relationshipSettings.validationConcurrency,
}
: undefined,

View file

@ -70,6 +70,7 @@ interface KtxRelationshipDiagnosticsPolicy {
validationRequiredForManifest: boolean;
maxCandidatesPerColumn: number;
profileSampleRows: number;
profileConcurrency: number;
validationConcurrency: number;
}
@ -118,6 +119,7 @@ const DEFAULT_POLICY: KtxRelationshipDiagnosticsPolicy = {
validationRequiredForManifest: true,
maxCandidatesPerColumn: 25,
profileSampleRows: 10000,
profileConcurrency: 4,
validationConcurrency: 4,
};

View file

@ -228,6 +228,7 @@ export async function discoverKtxRelationships(
executor,
ctx: input.context,
profileSampleRows: input.settings.profileSampleRows,
profileConcurrency: input.settings.profileConcurrency,
cache: profileCache,
});
const deterministicCandidates: KtxRelationshipDiscoveryCandidate[] = generateKtxRelationshipDiscoveryCandidates(

View file

@ -1,7 +1,7 @@
import { readFile } from 'node:fs/promises';
import { join } from 'node:path';
import Database from 'better-sqlite3';
import { afterEach, describe, expect, it } from 'vitest';
import { afterEach, describe, expect, it, vi } from 'vitest';
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { snapshotToKtxEnrichedSchema } from './local-enrichment.js';
import { loadKtxRelationshipBenchmarkFixture, maskKtxRelationshipBenchmarkSnapshot } from './relationship-benchmarks.js';
@ -351,4 +351,94 @@ describe('relationship profiling', () => {
scaleExecutor.close();
}
});
it('profiles tables concurrently up to profileConcurrency', async () => {
let inFlight = 0;
let maxInFlight = 0;
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
inFlight += 1;
maxInFlight = Math.max(maxInFlight, inFlight);
await new Promise((resolve) => setTimeout(resolve, 10));
inFlight -= 1;
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [[input.sql.includes('accounts') ? 'id' : 'account_id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders', 'payments', 'refunds']),
executor,
ctx: { runId: 'profile-concurrency' },
profileConcurrency: 4,
});
expect(maxInFlight).toBe(4);
});
it('keeps profiling other tables when one table profile fails', async () => {
const executor = {
executeReadOnly: vi.fn(async (input: KtxReadOnlyQueryInput) => {
if (input.sql.includes('"orders"')) {
throw new Error('orders unavailable');
}
return {
headers: [
'column_name',
'table_row_count',
'row_count',
'null_count',
'distinct_count',
'min_text_length',
'max_text_length',
'sample_values',
],
rows: [['id', 2, 2, 0, 2, 1, 2, '1\u001f2']],
totalRows: 1,
rowCount: 1,
};
}),
};
const result = await profileKtxRelationshipSchema({
connectionId: 'warehouse',
driver: 'sqlite',
schema: schemaWithTables(['accounts', 'orders']),
executor,
ctx: { runId: 'profile-error-isolated' },
profileConcurrency: 2,
});
expect(result.warnings).toContain('profile_failed:orders:orders unavailable');
expect(result.tables).toHaveLength(2);
expect(Object.keys(result.columns)).toContain('accounts.id');
});
});
function schemaWithTables(names: string[]): KtxEnrichedSchema {
return schema(
names.map((name) =>
table(name, [
column(name, name === 'orders' ? 'account_id' : 'id', {
nullable: false,
primaryKey: name !== 'orders',
}),
]),
),
);
}

View file

@ -1,4 +1,5 @@
import type { KtxEnrichedColumn, KtxEnrichedSchema, KtxEnrichedTable } from './enrichment-types.js';
import { mapWithConcurrency } from './relationship-validation.js';
import type {
KtxConnectionDriver,
KtxQueryResult,
@ -60,6 +61,7 @@ export interface ProfileKtxRelationshipSchemaInput {
ctx: KtxScanContext;
sampleValuesPerColumn?: number;
profileSampleRows?: number;
profileConcurrency?: number;
cache?: KtxRelationshipProfileCache;
}
@ -406,7 +408,8 @@ export async function profileKtxRelationshipSchema(
const columns: Record<string, KtxRelationshipColumnProfile> = {};
const warnings: string[] = [];
for (const table of input.schema.tables.filter((candidate) => candidate.enabled)) {
const enabledTables = input.schema.tables.filter((candidate) => candidate.enabled);
const tableResults = await mapWithConcurrency(enabledTables, input.profileConcurrency ?? 4, async (table) => {
const sampleValuesPerColumn = input.sampleValuesPerColumn ?? 5;
const profileSampleRows = input.profileSampleRows ?? 10000;
const cacheKey = tableProfileCacheKey({
@ -419,12 +422,7 @@ export async function profileKtxRelationshipSchema(
});
const cached = input.cache?.tableProfiles.get(cacheKey);
if (cached) {
tables.push(cached.table);
Object.assign(columns, cached.columns);
for (const warning of cached.warnings) {
warnings.push(warning);
}
continue;
return { cached, queryCount: 0 };
}
try {
@ -437,22 +435,35 @@ export async function profileKtxRelationshipSchema(
sampleValuesPerColumn,
profileSampleRows,
});
queryTotal += tableProfile.queryCount;
tables.push(tableProfile.table);
Object.assign(columns, tableProfile.columns);
input.cache?.tableProfiles.set(cacheKey, {
table: tableProfile.table,
columns: tableProfile.columns,
warnings: [],
});
return { tableProfile };
} catch (error) {
const failureWarning = `profile_failed:${table.ref.name}:${error instanceof Error ? error.message : String(error)}`;
warnings.push(failureWarning);
input.cache?.tableProfiles.set(cacheKey, {
const cachedFailure = {
table: { table: table.ref, rowCount: 0 },
columns: {},
warnings: [failureWarning],
});
};
input.cache?.tableProfiles.set(cacheKey, cachedFailure);
return { cached: cachedFailure, queryCount: 0 };
}
});
for (const result of tableResults) {
if ('tableProfile' in result) {
queryTotal += result.tableProfile.queryCount;
tables.push(result.tableProfile.table);
Object.assign(columns, result.tableProfile.columns);
continue;
}
tables.push(result.cached.table);
Object.assign(columns, result.cached.columns);
for (const warning of result.cached.warnings) {
warnings.push(warning);
}
}

View file

@ -193,7 +193,7 @@ function statusFor(input: {
return 'rejected';
}
async function mapWithConcurrency<TInput, TOutput>(
export async function mapWithConcurrency<TInput, TOutput>(
inputs: readonly TInput[],
concurrency: number,
mapOne: (input: TInput) => Promise<TOutput>,