feat(scan): batch table description generation

This commit is contained in:
Andrey Avtomonov 2026-05-22 17:03:58 +02:00
parent 48c0b3bb3b
commit a57deb670b
4 changed files with 425 additions and 77 deletions

View file

@ -378,6 +378,120 @@ describe('KtxDescriptionGenerator', () => {
expect(cache.set).toHaveBeenCalledWith('warehouse.public.orders', 'Commerce orders');
expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders');
});
it('generates one structured table description and reuses table samples for all columns', async () => {
const llmRuntime = createLlmProvider('unused');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [
{ name: 'status', description: 'Current order state' },
{ name: 'amount', description: 'Order amount in dollars' },
],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
rawDescriptions: { db: 'Orders fact table' },
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(result.tableDescription).toBe('Commerce orders');
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Current order state',
amount: 'Order amount in dollars',
});
expect(connector.sampleTable).toHaveBeenCalledTimes(1);
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).not.toHaveBeenCalled();
});
it('falls back to one column generateText call for each missing structured column', async () => {
const llmRuntime = createLlmProvider('Fallback status');
llmRuntime.generateObject = vi.fn(async () => ({
tableDescription: 'Commerce orders',
columns: [{ name: 'amount', description: 'Order amount in dollars' }],
}));
const connector = createConnector();
const generator = new KtxDescriptionGenerator({
llmRuntime,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [
{ name: 'status', type: 'text' },
{ name: 'amount', type: 'numeric' },
],
},
});
expect(Object.fromEntries(result.columnDescriptions)).toEqual({
status: 'Fallback status',
amount: 'Order amount in dollars',
});
expect(connector.sampleColumn).not.toHaveBeenCalled();
expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1);
expect(llmRuntime.generateText).toHaveBeenCalledTimes(1);
});
it('tolerates structured object failures and falls back to prepared column values', async () => {
const llmRuntime = createLlmProvider('Fallback description');
llmRuntime.generateObject = vi.fn(async () => {
throw new Error('object output unavailable');
});
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmRuntime,
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateBatchedTableDescriptions({
connectionId: 'conn-1',
connector: createConnector(),
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'status', type: 'text' }],
},
});
expect(result.tableDescription).toBeNull();
expect(Object.fromEntries(result.columnDescriptions)).toEqual({ status: 'Fallback description' });
expect(warnings).toContain('enrichment_failed');
expect(llmRuntime.generateText).toHaveBeenCalledTimes(1);
});
});
describe('KtxDescriptionGenerator resilience', () => {

View file

@ -1,4 +1,5 @@
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { z } from 'zod';
import type {
KtxColumnSampleInput,
KtxColumnSampleResult,
@ -53,7 +54,7 @@ export interface KtxDescriptionColumn {
sampleValues?: unknown[];
}
export interface KtxDescriptionColumnTable extends KtxTableRef {
interface KtxDescriptionColumnTable extends KtxTableRef {
columns: KtxDescriptionColumn[];
}
@ -112,6 +113,23 @@ export interface KtxGenerateTableDescriptionInput {
table: KtxDescriptionTableInput;
}
export interface KtxGenerateBatchedTableDescriptionsInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
context: KtxScanContext;
dataSourceType: string;
supportsNestedAnalysis: boolean;
table: KtxDescriptionColumnTable & {
rawDescriptions?: Record<string, string>;
columns: Array<KtxDescriptionColumn & { type?: string; comment?: string | null }>;
};
}
export interface KtxBatchedTableDescriptionsResult {
tableDescription: string | null;
columnDescriptions: Map<string, string | null>;
}
export interface KtxGenerateDataSourceDescriptionInput {
connectionId: string;
connector: KtxDescriptionSamplingPort;
@ -136,6 +154,18 @@ interface ColumnTaskResult {
skipped: boolean;
}
const batchedTableDescriptionSchema = z.object({
tableDescription: z.string(),
columns: z.array(
z.object({
name: z.string(),
description: z.string(),
}),
),
});
type BatchedTableDescriptionOutput = z.infer<typeof batchedTableDescriptionSchema>;
function descriptionSources(rawDescriptions: Record<string, string> | undefined): Array<[string, string]> {
if (!rawDescriptions) {
return [];
@ -250,6 +280,76 @@ function wordLimitLine(maxWords: number): string {
return `Please provide a concise description in ${maxWords} words or less.`;
}
function sampleValuesByColumn(
columns: readonly KtxDescriptionColumn[],
sampleData: KtxTableSampleResult | null,
): Map<string, unknown[]> {
const values = new Map<string, unknown[]>();
for (const column of columns) {
const existingValues = column.sampleValues?.filter((value) => value !== null && value !== undefined) ?? [];
if (existingValues.length > 0) {
values.set(column.name, existingValues);
}
}
if (!sampleData) {
return values;
}
for (const column of columns) {
const index = sampleData.headers.findIndex((header) => header.toLowerCase() === column.name.toLowerCase());
if (index < 0) {
continue;
}
const sampledValues = sampleData.rows
.map((row) => row[index])
.filter((value) => value !== null && value !== undefined);
if (sampledValues.length > 0) {
values.set(column.name, sampledValues);
}
}
return values;
}
function batchedPrompt(input: {
table: KtxGenerateBatchedTableDescriptionsInput['table'];
sampleData: KtxTableSampleResult | null;
dataSourceType: string;
tableMaxWords: number;
columnMaxWords: number;
}): KtxDescriptionPrompt {
const columnLines = input.table.columns
.map((column) => {
const typePart = column.type ? ` (${column.type})` : '';
const commentPart = column.rawDescriptions?.db ? ` - ${column.rawDescriptions.db}` : '';
return `- ${column.name}${typePart}${commentPart}`;
})
.join('\n');
const sampleLines =
input.sampleData && input.sampleData.rows.length > 0
? input.sampleData.rows
.slice(0, 5)
.map((row) =>
input.sampleData!.headers.map((header, index) => `${header}=${String(row[index] ?? '')}`).join(', '),
)
.join('\n')
: 'unavailable';
return {
system: [
'Analyze one database table and return structured JSON matching the supplied schema.',
`The table description must be ${input.tableMaxWords} words or less.`,
`Each column description must be ${input.columnMaxWords} words or less.`,
'Describe business meaning directly. Do not repeat table or column names as filler.',
].join('\n'),
user: [
`Table: ${input.table.name}`,
`Data source type: ${input.dataSourceType}`,
'Columns:',
columnLines,
'Sample rows:',
sampleLines,
].join('\n'),
};
}
/** @internal */
export function buildKtxColumnDescriptionPrompt(
input: KtxColumnDescriptionPromptInput & { maxWords?: number },
@ -582,6 +682,147 @@ export class KtxDescriptionGenerator {
}
}
async generateBatchedTableDescriptions(
input: KtxGenerateBatchedTableDescriptionsInput,
): Promise<KtxBatchedTableDescriptionsResult> {
const tableRef = toTableRef(input.table);
let sampleData: KtxTableSampleResult | null = null;
let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null;
if (!input.connector.sampleTable) {
fallbackReason = 'capability_missing';
this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'connector_capability_missing',
message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, capability: 'sampleTable' },
});
} else {
try {
sampleData = await retryAsync(
() =>
input.connector.sampleTable!(
{
connectionId: input.connectionId,
table: tableRef,
limit: 20,
},
input.context,
),
{
attempts: 3,
baseDelayMs: 200,
signal: input.context.signal,
onAttemptFailure: (error, attempt) => {
this.logger?.warn(`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
attempt,
});
},
},
);
if (sampleData.rows.length === 0) {
fallbackReason = 'empty_sample';
this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
}
} catch (error) {
if (error instanceof KtxAbortedError) {
throw error;
}
fallbackReason = 'sampling_failed';
this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'sampling_failed',
message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, error: errorMessage(error) },
});
}
}
const sampleValues = sampleValuesByColumn(input.table.columns, sampleData);
const descriptions = new Map<string, string | null>();
let tableDescription: string | null = null;
try {
const prompt = batchedPrompt({
table: input.table,
sampleData,
dataSourceType: input.dataSourceType,
tableMaxWords: this.settings.tableMaxWords,
columnMaxWords: this.settings.columnMaxWords,
});
const generated = await this.llmRuntime.generateObject<
BatchedTableDescriptionOutput,
typeof batchedTableDescriptionSchema
>({
role: 'candidateExtraction',
system: prompt.system,
prompt: prompt.user,
schema: batchedTableDescriptionSchema,
temperature: this.settings.temperature,
});
tableDescription = generated.tableDescription.trim() || null;
const generatedColumns = new Map(
generated.columns.map((column) => [column.name.toLowerCase(), column.description.trim() || null]),
);
for (const column of input.table.columns) {
const description = generatedColumns.get(column.name.toLowerCase()) ?? null;
descriptions.set(column.name, description);
}
if (tableDescription && fallbackReason !== null) {
this.onWarning?.({
code: 'description_fallback_used',
message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, reason: fallbackReason },
});
}
} catch (error) {
this.logger?.warn(`Batched table description failed for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'enrichment_failed',
message: `Failed to generate batched description for table ${input.table.name}: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id },
});
}
const tableContext = `Table: ${input.table.name} | Columns: ${input.table.columns.map((column) => column.name).join(', ')} | Data source: ${input.dataSourceType}`;
for (const column of input.table.columns) {
if (descriptions.get(column.name)) {
continue;
}
const fallback = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: sampleValues.get(column.name) ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
});
descriptions.set(column.name, fallback);
}
return { tableDescription, columnDescriptions: descriptions };
}
async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise<string | null> {
if (input.tables.length === 0) {
return 'No tables found in database';
@ -732,27 +973,13 @@ export class KtxDescriptionGenerator {
}
}
const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return {
columnName: column.name,
description: null,
skipped: false,
processed: false,
};
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: column.name,
columnValues: nonNullValues,
const description = await this.generateColumnDescriptionFromPreparedValues({
column,
columnValues: columnValues ?? [],
tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
const description = await this.generateAiDescription(prompt, 'ktx-column-description');
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
@ -782,6 +1009,30 @@ export class KtxDescriptionGenerator {
}
}
private async generateColumnDescriptionFromPreparedValues(input: {
column: KtxDescriptionColumn;
columnValues: unknown[];
tableContext: string;
dataSourceType: string;
supportsNestedAnalysis: boolean;
}): Promise<string | null> {
const nonNullValues = input.columnValues.filter((value) => value !== null && value !== undefined);
const hasRawDescriptions = descriptionSources(input.column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return null;
}
const prompt = buildKtxColumnDescriptionPrompt({
columnName: input.column.name,
columnValues: nonNullValues,
tableContext: input.tableContext,
dataSourceType: input.dataSourceType,
supportsNestedAnalysis: input.supportsNestedAnalysis,
rawDescriptions: input.column.rawDescriptions,
maxWords: this.settings.columnMaxWords,
});
return this.generateAiDescription(prompt, 'ktx-column-description');
}
private async generateAiDescription(prompt: KtxDescriptionPrompt, _operationName: string): Promise<string | null> {
try {
const text = await this.llmRuntime.generateText({

View file

@ -505,7 +505,7 @@ describe('local scan enrichment', () => {
expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 });
});
it('generates table descriptions with bounded table-level concurrency', async () => {
it('generates batched table descriptions with bounded table-level concurrency', async () => {
const concurrentSnapshot: KtxSchemaSnapshot = {
...snapshot,
tables: Array.from({ length: 8 }, (_, index) => ({
@ -529,27 +529,27 @@ describe('local scan enrichment', () => {
],
})),
};
let activeColumnSamples = 0;
let maxActiveColumnSamples = 0;
let activeTableSamples = 0;
let maxActiveTableSamples = 0;
const scanConnector = {
...connector(),
introspect: vi.fn(async () => concurrentSnapshot),
sampleColumn: vi.fn(async () => {
activeColumnSamples += 1;
maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples);
sampleColumn: vi.fn(async () => ({
values: ['1'],
nullCount: 0,
distinctCount: 1,
})),
sampleTable: vi.fn(async () => {
activeTableSamples += 1;
maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples);
await new Promise((resolve) => setTimeout(resolve, 10));
activeColumnSamples -= 1;
activeTableSamples -= 1;
return {
values: ['1'],
nullCount: 0,
distinctCount: 1,
headers: ['id'],
rows: [[1]],
totalRows: 1,
};
}),
sampleTable: vi.fn(async () => ({
headers: ['id'],
rows: [[1]],
totalRows: 1,
})),
};
const settings = {
...buildDefaultKtxProjectConfig().scan.relationships,
@ -565,7 +565,8 @@ describe('local scan enrichment', () => {
relationshipSettings: settings,
});
expect(maxActiveColumnSamples).toBe(6);
expect(maxActiveTableSamples).toBe(4);
expect(scanConnector.sampleColumn).not.toHaveBeenCalled();
});
it('reports enrichment progress for countable stages', async () => {
@ -707,7 +708,7 @@ describe('local scan enrichment', () => {
providerIdentity: { provider: 'fake', embeddingDimensions: 6 },
});
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const embedBatch = vi.spyOn(providers.embedding, 'embedBatch');
const second = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -725,7 +726,7 @@ describe('local scan enrichment', () => {
expect(first.state.resumedStages).toEqual([]);
expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).not.toHaveBeenCalled();
expect(generateObject).not.toHaveBeenCalled();
expect(embedBatch).not.toHaveBeenCalled();
expect(second.descriptionUpdates).toEqual(first.descriptionUpdates);
expect(second.embeddingUpdates).toEqual(first.embeddingUpdates);
@ -763,7 +764,7 @@ describe('local scan enrichment', () => {
tables: [{ ...firstTable, name: 'customers' }],
})),
};
const generateText = vi.spyOn(providers.llmRuntime, 'generateText');
const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject');
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
@ -779,7 +780,7 @@ describe('local scan enrichment', () => {
expect(result.state.resumedStages).toEqual([]);
expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']);
expect(generateText).toHaveBeenCalled();
expect(generateObject).toHaveBeenCalled();
});
it('runs providerless enriched scans as relationship-only discovery enrichment', async () => {

View file

@ -1,7 +1,7 @@
import pLimit from 'p-limit';
import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js';
import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js';
import { KtxDescriptionGenerator } from './description-generation.js';
import { buildKtxColumnEmbeddingText } from './embedding-text.js';
import {
completedKtxScanEnrichmentStateSummary,
@ -41,7 +41,7 @@ import type {
KtxTableRef,
} from './types.js';
const DESCRIPTION_TABLE_CONCURRENCY = 6;
const DESCRIPTION_TABLE_CONCURRENCY = 4;
export interface KtxLocalScanEnrichmentProviders {
llmRuntime: KtxLlmRuntimePort;
@ -180,7 +180,17 @@ function deterministicLlmRuntime(): KtxLlmRuntimePort {
async generateText(input) {
return `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'data source'}`;
},
async generateObject() {
async generateObject(input) {
if (input.prompt.includes('Sample rows:')) {
const columns = Array.from(input.prompt.matchAll(/^- ([^\s(]+)/gm), (match) => ({
name: match[1] ?? 'column',
description: `Deterministic description for ${match[1] ?? 'column'}`,
}));
return {
tableDescription: `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'table'}`,
columns,
} as never;
}
return { pkCandidates: [], fkCandidates: [] } as never;
},
async runAgentLoop() {
@ -235,30 +245,6 @@ export function snapshotToKtxEnrichedSchema(
};
}
function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable {
return {
catalog: table.catalog,
db: table.db,
name: table.name,
columns: table.columns.map((column) => ({
name: column.name,
...(column.comment ? { sampleValues: [column.comment], rawDescriptions: { db: column.comment } } : {}),
})),
};
}
function tableMetadataColumns(table: KtxSchemaTable): Array<{
name: string;
nativeType?: string | null;
comment?: string | null;
}> {
return table.columns.map((column) => ({
name: column.name,
nativeType: column.nativeType ?? null,
comment: column.comment ?? null,
}));
}
function embeddingBatchSize(maxBatchSize: number): number {
return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100;
}
@ -307,32 +293,28 @@ async function generateDescriptions(input: {
transient: true,
},
);
const tableInput = descriptionTable(table);
const columnResult = await generator.generateColumnDescriptions({
const batched = await generator.generateBatchedTableDescriptions({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis,
table: tableInput,
});
const tableDescription = await generator.generateTableDescription({
connectionId: input.snapshot.connectionId,
connector: input.connector,
context: input.context,
dataSourceType: input.snapshot.driver,
table: {
catalog: table.catalog,
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
columns: tableMetadataColumns(table),
columns: table.columns.map((column) => ({
name: column.name,
type: column.nativeType,
rawDescriptions: column.comment ? { db: column.comment } : {},
})),
},
});
return {
table: tableRef(table),
tableDescription,
columnDescriptions: Object.fromEntries(columnResult.columnDescriptions),
tableDescription: batched.tableDescription,
columnDescriptions: Object.fromEntries(batched.columnDescriptions),
};
}),
),