From a57deb670b1888cb56a49bc47b5fb072a6103f72 Mon Sep 17 00:00:00 2001 From: Andrey Avtomonov Date: Fri, 22 May 2026 17:03:58 +0200 Subject: [PATCH] feat(scan): batch table description generation --- .../scan/description-generation.test.ts | 114 +++++++ .../context/scan/description-generation.ts | 287 ++++++++++++++++-- .../src/context/scan/local-enrichment.test.ts | 41 +-- .../cli/src/context/scan/local-enrichment.ts | 60 ++-- 4 files changed, 425 insertions(+), 77 deletions(-) diff --git a/packages/cli/src/context/scan/description-generation.test.ts b/packages/cli/src/context/scan/description-generation.test.ts index e47d32be..a8678c81 100644 --- a/packages/cli/src/context/scan/description-generation.test.ts +++ b/packages/cli/src/context/scan/description-generation.test.ts @@ -378,6 +378,120 @@ describe('KtxDescriptionGenerator', () => { expect(cache.set).toHaveBeenCalledWith('warehouse.public.orders', 'Commerce orders'); expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders'); }); + + it('generates one structured table description and reuses table samples for all columns', async () => { + const llmRuntime = createLlmProvider('unused'); + llmRuntime.generateObject = vi.fn(async () => ({ + tableDescription: 'Commerce orders', + columns: [ + { name: 'status', description: 'Current order state' }, + { name: 'amount', description: 'Order amount in dollars' }, + ], + })); + const connector = createConnector(); + const generator = new KtxDescriptionGenerator({ + llmRuntime, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + rawDescriptions: { db: 'Orders fact table' }, + columns: [ + { name: 'status', type: 'text' }, + { name: 'amount', type: 'numeric' }, + ], + }, + }); + + expect(result.tableDescription).toBe('Commerce orders'); + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ + status: 'Current order state', + amount: 'Order amount in dollars', + }); + expect(connector.sampleTable).toHaveBeenCalledTimes(1); + expect(connector.sampleColumn).not.toHaveBeenCalled(); + expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1); + expect(llmRuntime.generateText).not.toHaveBeenCalled(); + }); + + it('falls back to one column generateText call for each missing structured column', async () => { + const llmRuntime = createLlmProvider('Fallback status'); + llmRuntime.generateObject = vi.fn(async () => ({ + tableDescription: 'Commerce orders', + columns: [{ name: 'amount', description: 'Order amount in dollars' }], + })); + const connector = createConnector(); + const generator = new KtxDescriptionGenerator({ + llmRuntime, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [ + { name: 'status', type: 'text' }, + { name: 'amount', type: 'numeric' }, + ], + }, + }); + + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ + status: 'Fallback status', + amount: 'Order amount in dollars', + }); + expect(connector.sampleColumn).not.toHaveBeenCalled(); + expect(llmRuntime.generateObject).toHaveBeenCalledTimes(1); + expect(llmRuntime.generateText).toHaveBeenCalledTimes(1); + }); + + it('tolerates structured object failures and falls back to prepared column values', async () => { + const llmRuntime = createLlmProvider('Fallback description'); + llmRuntime.generateObject = vi.fn(async () => { + throw new Error('object output unavailable'); + }); + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmRuntime, + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateBatchedTableDescriptions({ + connectionId: 'conn-1', + connector: createConnector(), + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'status', type: 'text' }], + }, + }); + + expect(result.tableDescription).toBeNull(); + expect(Object.fromEntries(result.columnDescriptions)).toEqual({ status: 'Fallback description' }); + expect(warnings).toContain('enrichment_failed'); + expect(llmRuntime.generateText).toHaveBeenCalledTimes(1); + }); }); describe('KtxDescriptionGenerator resilience', () => { diff --git a/packages/cli/src/context/scan/description-generation.ts b/packages/cli/src/context/scan/description-generation.ts index 4526215d..485f2b21 100644 --- a/packages/cli/src/context/scan/description-generation.ts +++ b/packages/cli/src/context/scan/description-generation.ts @@ -1,4 +1,5 @@ import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; +import { z } from 'zod'; import type { KtxColumnSampleInput, KtxColumnSampleResult, @@ -53,7 +54,7 @@ export interface KtxDescriptionColumn { sampleValues?: unknown[]; } -export interface KtxDescriptionColumnTable extends KtxTableRef { +interface KtxDescriptionColumnTable extends KtxTableRef { columns: KtxDescriptionColumn[]; } @@ -112,6 +113,23 @@ export interface KtxGenerateTableDescriptionInput { table: KtxDescriptionTableInput; } +export interface KtxGenerateBatchedTableDescriptionsInput { + connectionId: string; + connector: KtxDescriptionSamplingPort; + context: KtxScanContext; + dataSourceType: string; + supportsNestedAnalysis: boolean; + table: KtxDescriptionColumnTable & { + rawDescriptions?: Record; + columns: Array; + }; +} + +export interface KtxBatchedTableDescriptionsResult { + tableDescription: string | null; + columnDescriptions: Map; +} + export interface KtxGenerateDataSourceDescriptionInput { connectionId: string; connector: KtxDescriptionSamplingPort; @@ -136,6 +154,18 @@ interface ColumnTaskResult { skipped: boolean; } +const batchedTableDescriptionSchema = z.object({ + tableDescription: z.string(), + columns: z.array( + z.object({ + name: z.string(), + description: z.string(), + }), + ), +}); + +type BatchedTableDescriptionOutput = z.infer; + function descriptionSources(rawDescriptions: Record | undefined): Array<[string, string]> { if (!rawDescriptions) { return []; @@ -250,6 +280,76 @@ function wordLimitLine(maxWords: number): string { return `Please provide a concise description in ${maxWords} words or less.`; } +function sampleValuesByColumn( + columns: readonly KtxDescriptionColumn[], + sampleData: KtxTableSampleResult | null, +): Map { + const values = new Map(); + for (const column of columns) { + const existingValues = column.sampleValues?.filter((value) => value !== null && value !== undefined) ?? []; + if (existingValues.length > 0) { + values.set(column.name, existingValues); + } + } + if (!sampleData) { + return values; + } + for (const column of columns) { + const index = sampleData.headers.findIndex((header) => header.toLowerCase() === column.name.toLowerCase()); + if (index < 0) { + continue; + } + const sampledValues = sampleData.rows + .map((row) => row[index]) + .filter((value) => value !== null && value !== undefined); + if (sampledValues.length > 0) { + values.set(column.name, sampledValues); + } + } + return values; +} + +function batchedPrompt(input: { + table: KtxGenerateBatchedTableDescriptionsInput['table']; + sampleData: KtxTableSampleResult | null; + dataSourceType: string; + tableMaxWords: number; + columnMaxWords: number; +}): KtxDescriptionPrompt { + const columnLines = input.table.columns + .map((column) => { + const typePart = column.type ? ` (${column.type})` : ''; + const commentPart = column.rawDescriptions?.db ? ` - ${column.rawDescriptions.db}` : ''; + return `- ${column.name}${typePart}${commentPart}`; + }) + .join('\n'); + const sampleLines = + input.sampleData && input.sampleData.rows.length > 0 + ? input.sampleData.rows + .slice(0, 5) + .map((row) => + input.sampleData!.headers.map((header, index) => `${header}=${String(row[index] ?? '')}`).join(', '), + ) + .join('\n') + : 'unavailable'; + return { + system: [ + 'Analyze one database table and return structured JSON matching the supplied schema.', + `The table description must be ${input.tableMaxWords} words or less.`, + `Each column description must be ${input.columnMaxWords} words or less.`, + 'Describe business meaning directly. Do not repeat table or column names as filler.', + ].join('\n'), + user: [ + `Table: ${input.table.name}`, + `Data source type: ${input.dataSourceType}`, + 'Columns:', + columnLines, + 'Sample rows:', + sampleLines, + ].join('\n'), + }; +} + /** @internal */ export function buildKtxColumnDescriptionPrompt( input: KtxColumnDescriptionPromptInput & { maxWords?: number }, @@ -582,6 +682,147 @@ export class KtxDescriptionGenerator { } } + async generateBatchedTableDescriptions( + input: KtxGenerateBatchedTableDescriptionsInput, + ): Promise { + const tableRef = toTableRef(input.table); + let sampleData: KtxTableSampleResult | null = null; + let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null; + if (!input.connector.sampleTable) { + fallbackReason = 'capability_missing'; + this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'connector_capability_missing', + message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, capability: 'sampleTable' }, + }); + } else { + try { + sampleData = await retryAsync( + () => + input.connector.sampleTable!( + { + connectionId: input.connectionId, + table: tableRef, + limit: 20, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn(`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + attempt, + }); + }, + }, + ); + if (sampleData.rows.length === 0) { + fallbackReason = 'empty_sample'; + this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + } + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + fallbackReason = 'sampling_failed'; + this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'sampling_failed', + message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, error: errorMessage(error) }, + }); + } + } + + const sampleValues = sampleValuesByColumn(input.table.columns, sampleData); + const descriptions = new Map(); + let tableDescription: string | null = null; + + try { + const prompt = batchedPrompt({ + table: input.table, + sampleData, + dataSourceType: input.dataSourceType, + tableMaxWords: this.settings.tableMaxWords, + columnMaxWords: this.settings.columnMaxWords, + }); + const generated = await this.llmRuntime.generateObject< + BatchedTableDescriptionOutput, + typeof batchedTableDescriptionSchema + >({ + role: 'candidateExtraction', + system: prompt.system, + prompt: prompt.user, + schema: batchedTableDescriptionSchema, + temperature: this.settings.temperature, + }); + tableDescription = generated.tableDescription.trim() || null; + const generatedColumns = new Map( + generated.columns.map((column) => [column.name.toLowerCase(), column.description.trim() || null]), + ); + for (const column of input.table.columns) { + const description = generatedColumns.get(column.name.toLowerCase()) ?? null; + descriptions.set(column.name, description); + } + if (tableDescription && fallbackReason !== null) { + this.onWarning?.({ + code: 'description_fallback_used', + message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, reason: fallbackReason }, + }); + } + } catch (error) { + this.logger?.warn(`Batched table description failed for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate batched description for table ${input.table.name}: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id }, + }); + } + + const tableContext = `Table: ${input.table.name} | Columns: ${input.table.columns.map((column) => column.name).join(', ')} | Data source: ${input.dataSourceType}`; + for (const column of input.table.columns) { + if (descriptions.get(column.name)) { + continue; + } + const fallback = await this.generateColumnDescriptionFromPreparedValues({ + column, + columnValues: sampleValues.get(column.name) ?? [], + tableContext, + dataSourceType: input.dataSourceType, + supportsNestedAnalysis: input.supportsNestedAnalysis, + }); + descriptions.set(column.name, fallback); + } + + return { tableDescription, columnDescriptions: descriptions }; + } + async generateDataSourceDescription(input: KtxGenerateDataSourceDescriptionInput): Promise { if (input.tables.length === 0) { return 'No tables found in database'; @@ -732,27 +973,13 @@ export class KtxDescriptionGenerator { } } - const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined); - const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0; - if (nonNullValues.length === 0 && !hasRawDescriptions) { - return { - columnName: column.name, - description: null, - skipped: false, - processed: false, - }; - } - - const prompt = buildKtxColumnDescriptionPrompt({ - columnName: column.name, - columnValues: nonNullValues, + const description = await this.generateColumnDescriptionFromPreparedValues({ + column, + columnValues: columnValues ?? [], tableContext, dataSourceType: input.dataSourceType, supportsNestedAnalysis: input.supportsNestedAnalysis, - rawDescriptions: column.rawDescriptions, - maxWords: this.settings.columnMaxWords, }); - const description = await this.generateAiDescription(prompt, 'ktx-column-description'); if (cacheKey && description) { await this.cache?.set(cacheKey, description); @@ -782,6 +1009,30 @@ export class KtxDescriptionGenerator { } } + private async generateColumnDescriptionFromPreparedValues(input: { + column: KtxDescriptionColumn; + columnValues: unknown[]; + tableContext: string; + dataSourceType: string; + supportsNestedAnalysis: boolean; + }): Promise { + const nonNullValues = input.columnValues.filter((value) => value !== null && value !== undefined); + const hasRawDescriptions = descriptionSources(input.column.rawDescriptions).length > 0; + if (nonNullValues.length === 0 && !hasRawDescriptions) { + return null; + } + const prompt = buildKtxColumnDescriptionPrompt({ + columnName: input.column.name, + columnValues: nonNullValues, + tableContext: input.tableContext, + dataSourceType: input.dataSourceType, + supportsNestedAnalysis: input.supportsNestedAnalysis, + rawDescriptions: input.column.rawDescriptions, + maxWords: this.settings.columnMaxWords, + }); + return this.generateAiDescription(prompt, 'ktx-column-description'); + } + private async generateAiDescription(prompt: KtxDescriptionPrompt, _operationName: string): Promise { try { const text = await this.llmRuntime.generateText({ diff --git a/packages/cli/src/context/scan/local-enrichment.test.ts b/packages/cli/src/context/scan/local-enrichment.test.ts index 90ddae78..9647c8b9 100644 --- a/packages/cli/src/context/scan/local-enrichment.test.ts +++ b/packages/cli/src/context/scan/local-enrichment.test.ts @@ -505,7 +505,7 @@ describe('local scan enrichment', () => { expect(result.relationships).toEqual({ accepted: 0, review: 1, rejected: 0, skipped: 0 }); }); - it('generates table descriptions with bounded table-level concurrency', async () => { + it('generates batched table descriptions with bounded table-level concurrency', async () => { const concurrentSnapshot: KtxSchemaSnapshot = { ...snapshot, tables: Array.from({ length: 8 }, (_, index) => ({ @@ -529,27 +529,27 @@ describe('local scan enrichment', () => { ], })), }; - let activeColumnSamples = 0; - let maxActiveColumnSamples = 0; + let activeTableSamples = 0; + let maxActiveTableSamples = 0; const scanConnector = { ...connector(), introspect: vi.fn(async () => concurrentSnapshot), - sampleColumn: vi.fn(async () => { - activeColumnSamples += 1; - maxActiveColumnSamples = Math.max(maxActiveColumnSamples, activeColumnSamples); + sampleColumn: vi.fn(async () => ({ + values: ['1'], + nullCount: 0, + distinctCount: 1, + })), + sampleTable: vi.fn(async () => { + activeTableSamples += 1; + maxActiveTableSamples = Math.max(maxActiveTableSamples, activeTableSamples); await new Promise((resolve) => setTimeout(resolve, 10)); - activeColumnSamples -= 1; + activeTableSamples -= 1; return { - values: ['1'], - nullCount: 0, - distinctCount: 1, + headers: ['id'], + rows: [[1]], + totalRows: 1, }; }), - sampleTable: vi.fn(async () => ({ - headers: ['id'], - rows: [[1]], - totalRows: 1, - })), }; const settings = { ...buildDefaultKtxProjectConfig().scan.relationships, @@ -565,7 +565,8 @@ describe('local scan enrichment', () => { relationshipSettings: settings, }); - expect(maxActiveColumnSamples).toBe(6); + expect(maxActiveTableSamples).toBe(4); + expect(scanConnector.sampleColumn).not.toHaveBeenCalled(); }); it('reports enrichment progress for countable stages', async () => { @@ -707,7 +708,7 @@ describe('local scan enrichment', () => { providerIdentity: { provider: 'fake', embeddingDimensions: 6 }, }); - const generateText = vi.spyOn(providers.llmRuntime, 'generateText'); + const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const embedBatch = vi.spyOn(providers.embedding, 'embedBatch'); const second = await runLocalScanEnrichment({ connectionId: 'warehouse', @@ -725,7 +726,7 @@ describe('local scan enrichment', () => { expect(first.state.resumedStages).toEqual([]); expect(second.state.resumedStages).toEqual(['descriptions', 'embeddings', 'relationships']); expect(second.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); - expect(generateText).not.toHaveBeenCalled(); + expect(generateObject).not.toHaveBeenCalled(); expect(embedBatch).not.toHaveBeenCalled(); expect(second.descriptionUpdates).toEqual(first.descriptionUpdates); expect(second.embeddingUpdates).toEqual(first.embeddingUpdates); @@ -763,7 +764,7 @@ describe('local scan enrichment', () => { tables: [{ ...firstTable, name: 'customers' }], })), }; - const generateText = vi.spyOn(providers.llmRuntime, 'generateText'); + const generateObject = vi.spyOn(providers.llmRuntime, 'generateObject'); const result = await runLocalScanEnrichment({ connectionId: 'warehouse', @@ -779,7 +780,7 @@ describe('local scan enrichment', () => { expect(result.state.resumedStages).toEqual([]); expect(result.state.completedStages).toEqual(['descriptions', 'embeddings', 'relationships']); - expect(generateText).toHaveBeenCalled(); + expect(generateObject).toHaveBeenCalled(); }); it('runs providerless enriched scans as relationship-only discovery enrichment', async () => { diff --git a/packages/cli/src/context/scan/local-enrichment.ts b/packages/cli/src/context/scan/local-enrichment.ts index 32807532..a175c7fa 100644 --- a/packages/cli/src/context/scan/local-enrichment.ts +++ b/packages/cli/src/context/scan/local-enrichment.ts @@ -1,7 +1,7 @@ import pLimit from 'p-limit'; import type { KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import { buildDefaultKtxProjectConfig, type KtxScanRelationshipConfig } from '../project/config.js'; -import { type KtxDescriptionColumnTable, KtxDescriptionGenerator } from './description-generation.js'; +import { KtxDescriptionGenerator } from './description-generation.js'; import { buildKtxColumnEmbeddingText } from './embedding-text.js'; import { completedKtxScanEnrichmentStateSummary, @@ -41,7 +41,7 @@ import type { KtxTableRef, } from './types.js'; -const DESCRIPTION_TABLE_CONCURRENCY = 6; +const DESCRIPTION_TABLE_CONCURRENCY = 4; export interface KtxLocalScanEnrichmentProviders { llmRuntime: KtxLlmRuntimePort; @@ -180,7 +180,17 @@ function deterministicLlmRuntime(): KtxLlmRuntimePort { async generateText(input) { return `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'data source'}`; }, - async generateObject() { + async generateObject(input) { + if (input.prompt.includes('Sample rows:')) { + const columns = Array.from(input.prompt.matchAll(/^- ([^\s(]+)/gm), (match) => ({ + name: match[1] ?? 'column', + description: `Deterministic description for ${match[1] ?? 'column'}`, + })); + return { + tableDescription: `Deterministic description for ${input.prompt.slice(0, 64).trim() || 'table'}`, + columns, + } as never; + } return { pkCandidates: [], fkCandidates: [] } as never; }, async runAgentLoop() { @@ -235,30 +245,6 @@ export function snapshotToKtxEnrichedSchema( }; } -function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable { - return { - catalog: table.catalog, - db: table.db, - name: table.name, - columns: table.columns.map((column) => ({ - name: column.name, - ...(column.comment ? { sampleValues: [column.comment], rawDescriptions: { db: column.comment } } : {}), - })), - }; -} - -function tableMetadataColumns(table: KtxSchemaTable): Array<{ - name: string; - nativeType?: string | null; - comment?: string | null; -}> { - return table.columns.map((column) => ({ - name: column.name, - nativeType: column.nativeType ?? null, - comment: column.comment ?? null, - })); -} - function embeddingBatchSize(maxBatchSize: number): number { return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100; } @@ -307,32 +293,28 @@ async function generateDescriptions(input: { transient: true, }, ); - const tableInput = descriptionTable(table); - const columnResult = await generator.generateColumnDescriptions({ + const batched = await generator.generateBatchedTableDescriptions({ connectionId: input.snapshot.connectionId, connector: input.connector, context: input.context, dataSourceType: input.snapshot.driver, supportsNestedAnalysis: input.connector.capabilities.nestedAnalysis, - table: tableInput, - }); - const tableDescription = await generator.generateTableDescription({ - connectionId: input.snapshot.connectionId, - connector: input.connector, - context: input.context, - dataSourceType: input.snapshot.driver, table: { catalog: table.catalog, db: table.db, name: table.name, rawDescriptions: table.comment ? { db: table.comment } : {}, - columns: tableMetadataColumns(table), + columns: table.columns.map((column) => ({ + name: column.name, + type: column.nativeType, + rawDescriptions: column.comment ? { db: column.comment } : {}, + })), }, }); return { table: tableRef(table), - tableDescription, - columnDescriptions: Object.fromEntries(columnResult.columnDescriptions), + tableDescription: batched.tableDescription, + columnDescriptions: Object.fromEntries(batched.columnDescriptions), }; }), ),