diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index ce68cfca..0b152d09 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -133,6 +133,50 @@ function warningLine(warning: KtxScanWarning): string { return `${warning.code}: ${location}${warning.message}`; } +function groupWarningsByCode(warnings: readonly KtxScanWarning[]): Map { + const groups = new Map(); + for (const warning of warnings) { + const list = groups.get(warning.code); + if (list) { + list.push(warning); + } else { + groups.set(warning.code, [warning]); + } + } + return groups; +} + +function describeWarningGroup(code: string, count: number): string { + switch (code) { + case 'sampling_failed': + return `${count} ${plural(count, 'table')} could not be sampled (retries exhausted); descriptions used metadata-only fallback or were skipped.`; + case 'description_fallback_used': + return `${count} ${plural(count, 'table')} got an AI description from column metadata only (no sample rows available).`; + case 'enrichment_failed': + return `${count} ${plural(count, 'table/column')} could not be enriched.`; + case 'connector_capability_missing': + return `${count} ${plural(count, 'table')} affected by missing connector capability.`; + case 'statistics_failed': + return `${count} statistics ${plural(count, 'lookup')} failed.`; + case 'llm_unavailable': + return 'LLM provider unavailable; AI enrichment was skipped.'; + case 'embedding_unavailable': + return 'Embedding provider unavailable; embeddings were skipped.'; + case 'relationship_validation_failed': + return `${count} relationship ${plural(count, 'validation')} could not run.`; + case 'relationship_llm_invalid_reference': + return `${count} LLM-proposed ${plural(count, 'relationship')} referenced unknown columns.`; + case 'relationship_llm_proposal_failed': + return `${count} LLM relationship ${plural(count, 'proposal')} failed.`; + case 'scan_enrichment_backend_not_configured': + return 'Scan enrichment backend is not configured; AI stages were skipped.'; + case 'credential_redacted': + return `${count} ${plural(count, 'credential')} were redacted from scan output.`; + default: + return `${count} ${plural(count, 'warning')} (${code})`; + } +} + function managedDaemonOptionsForScanRun(args: Extract, io: KtxCliIo) { if (args.databaseIntrospectionUrl || !args.cliVersion || !args.runtimeInstallPolicy) { return undefined; @@ -153,11 +197,26 @@ function writeNeedsAttention(report: KtxScanReport, io: KtxCliIo): void { } if (report.warnings.length > 0) { io.stdout.write(` ${report.warnings.length} ${plural(report.warnings.length, 'warning')}\n`); - for (const warning of report.warnings.slice(0, 5)) { - io.stdout.write(` - ${warningLine(warning)}\n`); - } - if (report.warnings.length > 5) { - io.stdout.write(` - ${report.warnings.length - 5} more warnings in the JSON report\n`); + const groups = groupWarningsByCode(report.warnings); + for (const [code, warnings] of groups) { + io.stdout.write(` - ${describeWarningGroup(code, warnings.length)}\n`); + const first = warnings[0]; + if (first) { + io.stdout.write(` ${warningLine(first)}\n`); + } + if (warnings.length > 1) { + const moreTables = warnings + .slice(1) + .map((warning) => + warning.table ? (warning.column ? `${warning.table}.${warning.column}` : warning.table) : null, + ) + .filter((value): value is string => value !== null) + .slice(0, 3); + if (moreTables.length > 0) { + const suffix = warnings.length - 1 > moreTables.length ? `, …` : ''; + io.stdout.write(` also: ${moreTables.join(', ')}${suffix}\n`); + } + } } } if (report.capabilityGaps.length > 0) { diff --git a/packages/context/src/agent/agent-runner.service.ts b/packages/context/src/agent/agent-runner.service.ts index 11a0715c..128818f9 100644 --- a/packages/context/src/agent/agent-runner.service.ts +++ b/packages/context/src/agent/agent-runner.service.ts @@ -1,4 +1,4 @@ -import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; import { generateText, stepCountIs, type TelemetrySettings, type Tool } from 'ai'; import { noopLogger, type KtxLogger } from '../core/index.js'; import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from '../llm/index.js'; @@ -36,14 +36,6 @@ export interface AgentRunnerServiceDeps { logger?: KtxLogger; } -function splitSystemPromptMessages(messages: ReturnType['messages']) { - const systemMessages = messages.filter((message) => message.role === 'system'); - return { - system: systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages, - messages: messages.filter((message) => message.role !== 'system'), - }; -} - export class AgentRunnerService { private readonly logger: KtxLogger; @@ -62,7 +54,7 @@ export class AgentRunnerService { tools: params.toolSet, model, }); - const promptMessages = splitSystemPromptMessages(built.messages); + const promptMessages = splitKtxSystemMessages(built.messages); await this.deps.debugRequestRecorder?.record( summarizeKtxLlmDebugRequest({ diff --git a/packages/context/src/ingest/page-triage/page-triage.service.test.ts b/packages/context/src/ingest/page-triage/page-triage.service.test.ts index 2fa367aa..4fd57c42 100644 --- a/packages/context/src/ingest/page-triage/page-triage.service.test.ts +++ b/packages/context/src/ingest/page-triage/page-triage.service.test.ts @@ -227,9 +227,10 @@ describe('PageTriageService', () => { }); generateTextMock .mockImplementationOnce((args: any) => { - const systemMessage = args.messages.find((m: { role: string }) => m.role === 'system'); + const systemMessage = args.system ?? args.messages.find((m: { role: string }) => m.role === 'system'); const userMessage = args.messages.find((m: { role: string }) => m.role === 'user'); - const systemText = systemMessage.content as string; + const systemText = + typeof systemMessage === 'string' ? systemMessage : (systemMessage.content as string); const userText = userMessage.content as string; expect(systemText).toContain( 'Reusable templates and scripts are durable knowledge regardless of subject matter.', diff --git a/packages/context/src/ingest/page-triage/page-triage.service.ts b/packages/context/src/ingest/page-triage/page-triage.service.ts index f4e6e65f..765b4c21 100644 --- a/packages/context/src/ingest/page-triage/page-triage.service.ts +++ b/packages/context/src/ingest/page-triage/page-triage.service.ts @@ -1,7 +1,7 @@ import { createHash } from 'node:crypto'; import { readdir, readFile } from 'node:fs/promises'; import { dirname, join, relative } from 'node:path'; -import { KtxMessageBuilder, type KtxLlmProvider } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider } from '@ktx/llm'; import { generateText, type ToolSet } from 'ai'; import pLimit from 'p-limit'; import { z } from 'zod'; @@ -346,10 +346,12 @@ export class PageTriageService { tools: {}, model, }); + const split = splitKtxSystemMessages(built.messages); const result = await this.runGenerateText({ model, temperature: 0, - messages: built.messages, + ...(split.system ? { system: split.system } : {}), + messages: split.messages, tools: built.tools as ToolSet, }); return result.text; diff --git a/packages/context/src/llm/generation.ts b/packages/context/src/llm/generation.ts index 1bbdbcab..7cb11d58 100644 --- a/packages/context/src/llm/generation.ts +++ b/packages/context/src/llm/generation.ts @@ -1,4 +1,4 @@ -import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; +import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm'; import { generateText, Output, type FlexibleSchema, type ToolSet } from 'ai'; type GenerateTextInput = Parameters[0]; @@ -29,10 +29,12 @@ export async function generateKtxText(input: GenerateKtxTextInput): Promise( tools: input.tools ?? {}, model, }); + const split = splitKtxSystemMessages(built.messages); const result = await (input.generateText ?? generateText)({ model, temperature: input.temperature ?? 0, - messages: built.messages, + ...(split.system ? { system: split.system } : {}), + messages: split.messages, tools: built.tools as ToolSet, ...(hasTools(built.tools as ToolSet) ? { diff --git a/packages/context/src/scan/description-generation.test.ts b/packages/context/src/scan/description-generation.test.ts index bd9621df..8ffd3b5c 100644 --- a/packages/context/src/scan/description-generation.test.ts +++ b/packages/context/src/scan/description-generation.test.ts @@ -203,11 +203,11 @@ describe('KtxDescriptionGenerator', () => { expect(generateText).toHaveBeenCalledWith( expect.objectContaining({ temperature: 0.2, + system: expect.objectContaining({ + role: 'system', + content: expect.stringContaining('Please provide a concise description in 12 words or less.'), + }), messages: expect.arrayContaining([ - expect.objectContaining({ - role: 'system', - content: expect.stringContaining('Please provide a concise description in 12 words or less.'), - }), expect.objectContaining({ role: 'user', content: expect.stringContaining(' status '), @@ -215,6 +215,8 @@ describe('KtxDescriptionGenerator', () => { ]), }), ); + const lastCall = vi.mocked(generateText).mock.calls.at(-1)?.[0]; + expect(lastCall?.messages?.some((message) => message.role === 'system')).toBe(false); }); it('samples through the connector when column values are not pre-fetched', async () => { @@ -391,3 +393,289 @@ describe('KtxDescriptionGenerator', () => { expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders'); }); }); + +describe('KtxDescriptionGenerator resilience', () => { + function createLogger() { + return { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + } + + it('retries sampleTable on transient failure and uses sampled rows when it eventually succeeds', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValueOnce(new Error('pool: transient ECONNRESET')) + .mockRejectedValueOnce(new Error('pool: transient ECONNRESET')) + .mockResolvedValue({ + headers: ['id', 'status'], + rows: [ + [1, 'paid'], + [2, 'refunded'], + ], + totalRows: 2, + }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const logger = createLogger(); + const warnings: Array<{ code: string; table?: string }> = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Commerce orders'), + logger, + onWarning: (warning) => warnings.push({ code: warning.code, ...(warning.table ? { table: warning.table } : {}) }), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orders' }, + }); + + expect(description).toBe('Commerce orders'); + expect(sampleTable).toHaveBeenCalledTimes(3); + expect(logger.warn).toHaveBeenCalledTimes(2); + expect(warnings).toEqual([]); + }); + + it('falls back to metadata-only prompt when sampleTable retries exhaust', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const logger = createLogger(); + const warnings: Array<{ code: string; table?: string; metadata?: Record }> = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Customer reference data'), + logger, + onWarning: (warning) => + warnings.push({ + code: warning.code, + ...(warning.table ? { table: warning.table } : {}), + ...(warning.metadata ? { metadata: warning.metadata } : {}), + }), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { + catalog: null, + db: 'public', + name: 'customers', + columns: [ + { name: 'id', nativeType: 'uuid' }, + { name: 'email', nativeType: 'text', comment: 'Primary contact email' }, + ], + }, + }); + + expect(description).toBe('Customer reference data'); + expect(sampleTable).toHaveBeenCalledTimes(3); + expect(warnings.map((warning) => warning.code)).toEqual(['sampling_failed', 'description_fallback_used']); + expect(warnings[1]?.metadata?.reason).toBe('sampling_failed'); + const userPrompt = (vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> }) + .messages.find((message) => message.role === 'user')?.content; + expect(userPrompt).toContain('Columns (metadata only, no sample rows)'); + expect(userPrompt).toContain('email (text)'); + expect(userPrompt).toContain('Primary contact email'); + }); + + it('emits enrichment_failed and returns null when both sampling and metadata-only LLM fail', async () => { + const sampleTable = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createFailingLlmProvider(), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orphan', columns: [{ name: 'id' }] }, + }); + + expect(description).toBeNull(); + expect(warnings).toEqual(['sampling_failed', 'enrichment_failed']); + }); + + it('uses metadata-only fallback when connector has no sampleTable', async () => { + const connector = createConnector(); + const samplerWithoutTable: KtxScanConnector = { + ...connector, + sampleTable: undefined, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Orders mart'), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const description = await generator.generateTableDescription({ + connectionId: 'conn-1', + connector: samplerWithoutTable, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + table: { + catalog: null, + db: 'public', + name: 'mart_orders', + columns: [{ name: 'order_id', nativeType: 'uuid' }], + }, + }); + + expect(description).toBe('Orders mart'); + expect(warnings).toEqual(['connector_capability_missing', 'description_fallback_used']); + }); + + it('aborts retry loop when the scan context signal fires', async () => { + const controller = new AbortController(); + const sampleTable = vi.fn>().mockImplementation(async () => { + controller.abort(); + throw new Error('first attempt blew up'); + }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleTable, + }; + const warnings: string[] = []; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('should not be called'), + onWarning: (warning) => warnings.push(warning.code), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + await expect( + generator.generateTableDescription({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1', signal: controller.signal }, + dataSourceType: 'POSTGRESQL', + table: { catalog: null, db: 'public', name: 'orders' }, + }), + ).rejects.toThrow('aborted'); + + expect(sampleTable).toHaveBeenCalledTimes(1); + expect(warnings).toEqual([]); + }); + + it('generates column descriptions from rawDescriptions when sampleColumn is unavailable', async () => { + const samplerWithoutColumn: KtxScanConnector = { + ...createConnector(), + sampleColumn: undefined, + }; + const logger = createLogger(); + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Payment lifecycle state'), + logger, + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector: samplerWithoutColumn, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'status', rawDescriptions: { db: 'order lifecycle state' } }], + }, + }); + + expect(result.columnDescriptions).toEqual([['status', 'Payment lifecycle state']]); + expect(logger.warn).toHaveBeenCalled(); + const userPrompt = ( + vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> } + ).messages.find((message) => message.role === 'user')?.content; + expect(userPrompt).toContain(' unavailable '); + expect(userPrompt).toContain(' order lifecycle state '); + }); + + it('generates column descriptions from rawDescriptions when sampleColumn retries exhaust', async () => { + const sampleColumn = vi + .fn>() + .mockRejectedValue(new Error('pool: connection refused')); + const flakyConnector: KtxScanConnector = { + ...createConnector(), + sampleColumn, + }; + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('Customer reference identifier'), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector: flakyConnector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'customer_id', rawDescriptions: { db: 'FK to customers.id' } }], + }, + }); + + expect(sampleColumn).toHaveBeenCalledTimes(3); + expect(result.columnDescriptions).toEqual([['customer_id', 'Customer reference identifier']]); + }); + + it('skips column LLM call only when neither samples nor rawDescriptions are available', async () => { + const sampleColumn = vi + .fn>() + .mockResolvedValue({ values: [null, null], nullCount: 2, distinctCount: 0 }); + const connector: KtxScanConnector = { + ...createConnector(), + sampleColumn, + }; + vi.mocked(generateText).mockClear(); + const generator = new KtxDescriptionGenerator({ + llmProvider: createLlmProvider('should not be called'), + settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 }, + }); + + const result = await generator.generateColumnDescriptions({ + connectionId: 'conn-1', + connector, + context: { runId: 'run-1' }, + dataSourceType: 'POSTGRESQL', + supportsNestedAnalysis: false, + table: { + catalog: null, + db: 'public', + name: 'orders', + columns: [{ name: 'opaque_blob' }], + }, + }); + + expect(result.columnDescriptions).toEqual([['opaque_blob', null]]); + expect(generateText).not.toHaveBeenCalled(); + }); +}); diff --git a/packages/context/src/scan/description-generation.ts b/packages/context/src/scan/description-generation.ts index 15f1d66e..184827ff 100644 --- a/packages/context/src/scan/description-generation.ts +++ b/packages/context/src/scan/description-generation.ts @@ -5,11 +5,18 @@ import type { KtxColumnSampleResult, KtxScanContext, KtxScanLoggerPort, + KtxScanWarning, KtxTableRef, KtxTableSampleInput, KtxTableSampleResult, } from './types.js'; +interface KtxDescriptionTableColumn { + name: string; + nativeType?: string | null; + comment?: string | null; +} + export interface KtxDescriptionCachePort { buildTableKey(table: KtxTableRef): string; buildColumnKey(table: KtxTableRef, columnName: string): string; @@ -53,6 +60,7 @@ export interface KtxDescriptionColumnTable extends KtxTableRef { export interface KtxDescriptionTableInput extends KtxTableRef { rawDescriptions?: Record; + columns?: KtxDescriptionTableColumn[]; } export interface KtxColumnAnalysisResult { @@ -72,7 +80,8 @@ export interface KtxColumnDescriptionPromptInput { export interface KtxTableDescriptionPromptInput { tableName: string; - sampleData: KtxTableSampleResult; + sampleData?: KtxTableSampleResult; + columns?: KtxDescriptionTableColumn[]; dataSourceType: string; rawDescriptions?: Record; } @@ -114,6 +123,7 @@ export interface KtxDescriptionGeneratorOptions { llmProvider: KtxLlmProvider; cache?: KtxDescriptionCachePort; logger?: KtxScanLoggerPort; + onWarning?: (warning: KtxScanWarning) => void; settings: KtxDescriptionGenerationSettings; } @@ -136,6 +146,66 @@ function errorMessage(error: unknown): string { return error instanceof Error ? error.message : String(error); } +class KtxAbortedError extends Error { + constructor() { + super('aborted'); + this.name = 'KtxAbortedError'; + } +} + +async function delayWithAbort(ms: number, signal?: AbortSignal): Promise { + if (!signal) { + await new Promise((resolve) => setTimeout(resolve, ms)); + return; + } + if (signal.aborted) { + throw new KtxAbortedError(); + } + await new Promise((resolve, reject) => { + const timer = setTimeout(() => { + signal.removeEventListener('abort', onAbort); + resolve(); + }, ms); + const onAbort = (): void => { + clearTimeout(timer); + reject(new KtxAbortedError()); + }; + signal.addEventListener('abort', onAbort, { once: true }); + }); +} + +interface RetryAsyncOptions { + attempts: number; + baseDelayMs: number; + signal?: AbortSignal; + onAttemptFailure?: (error: unknown, attempt: number) => void; +} + +async function retryAsync(fn: () => Promise, options: RetryAsyncOptions): Promise { + const attempts = Math.max(1, options.attempts); + let lastError: unknown; + for (let attempt = 1; attempt <= attempts; attempt += 1) { + if (options.signal?.aborted) { + throw new KtxAbortedError(); + } + try { + return await fn(); + } catch (error) { + lastError = error; + if (error instanceof KtxAbortedError) { + throw error; + } + options.onAttemptFailure?.(error, attempt); + if (attempt === attempts) { + break; + } + const delay = options.baseDelayMs * 2 ** (attempt - 1); + await delayWithAbort(delay, options.signal); + } + } + throw lastError; +} + function toTableRef(table: KtxTableRef): KtxTableRef { return { catalog: table.catalog, @@ -205,11 +275,12 @@ Example: systemParts.push(wordLimitLine(input.maxWords)); } + const sampleValuesContent = valuesStr.length > 0 ? valuesStr : 'unavailable'; let user = ` ${input.tableContext} ${input.columnName} - ${valuesStr} + ${sampleValuesContent} `; const sources = descriptionSources(input.rawDescriptions); @@ -228,16 +299,6 @@ Example: export function buildKtxTableDescriptionPrompt( input: KtxTableDescriptionPromptInput & { maxWords?: number }, ): KtxDescriptionPrompt { - const columnInfo: string[] = []; - for (let index = 0; index < Math.min(input.sampleData.headers.length, 10); index += 1) { - const header = input.sampleData.headers[index]; - const sampleValues = input.sampleData.rows - .slice(0, 3) - .map((row) => row[index]) - .filter((value) => value !== null && value !== undefined); - columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`); - } - const systemParts: string[] = [ `Analyze database tables and provide a concise description. @@ -256,9 +317,38 @@ Example: "Information about healthcare professionals used for workforce manageme systemParts.push(wordLimitLine(input.maxWords)); } + const hasSamples = !!input.sampleData && input.sampleData.rows.length > 0; + let columnsLine: string; + let rowsLine: string; + if (hasSamples) { + const sampleData = input.sampleData!; + const columnInfo: string[] = []; + for (let index = 0; index < Math.min(sampleData.headers.length, 10); index += 1) { + const header = sampleData.headers[index]; + const sampleValues = sampleData.rows + .slice(0, 3) + .map((row) => row[index]) + .filter((value) => value !== null && value !== undefined); + columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`); + } + columnsLine = `Columns and sample data: ${columnInfo.join(' | ')}`; + rowsLine = `Total rows in sample: ${sampleData.rows.length}`; + } else if (input.columns && input.columns.length > 0) { + const columnInfo = input.columns.slice(0, 30).map((column) => { + const typePart = column.nativeType ? ` (${column.nativeType})` : ''; + const commentPart = column.comment ? ` — ${column.comment}` : ''; + return `${column.name}${typePart}${commentPart}`; + }); + columnsLine = `Columns (metadata only, no sample rows): ${columnInfo.join(' | ')}`; + rowsLine = 'Sample rows: unavailable'; + } else { + columnsLine = 'Columns: unavailable'; + rowsLine = 'Sample rows: unavailable'; + } + let user = `Table: ${input.tableName} -Columns and sample data: ${columnInfo.join(' | ')} -Total rows in sample: ${input.sampleData.rows.length} +${columnsLine} +${rowsLine} Data source type: ${input.dataSourceType}`; const sources = descriptionSources(input.rawDescriptions); @@ -313,12 +403,14 @@ export class KtxDescriptionGenerator { private readonly llmProvider: KtxLlmProvider; private readonly cache?: KtxDescriptionCachePort; private readonly logger?: KtxScanLoggerPort; + private readonly onWarning?: (warning: KtxScanWarning) => void; private readonly settings: ResolvedKtxDescriptionGenerationSettings; constructor(options: KtxDescriptionGeneratorOptions) { this.llmProvider = options.llmProvider; this.cache = options.cache; this.logger = options.logger; + this.onWarning = options.onWarning; this.settings = { columnMaxWords: options.settings.columnMaxWords, tableMaxWords: options.settings.tableMaxWords, @@ -366,26 +458,82 @@ export class KtxDescriptionGenerator { } } - if (!input.connector.sampleTable) { - this.logger?.warn('KTX scan connector does not support table sampling for table description generation', { + const sampleTable = input.connector.sampleTable; + let sampleData: KtxTableSampleResult | null = null; + let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null; + + if (!sampleTable) { + fallbackReason = 'capability_missing'; + this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', { connectorId: input.connector.id, table: input.table.name, }); - return null; + this.onWarning?.({ + code: 'connector_capability_missing', + message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, capability: 'sampleTable' }, + }); + } else { + try { + sampleData = await retryAsync( + () => + sampleTable( + { + connectionId: input.connectionId, + table: tableRef, + limit: 20, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn( + `sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + attempt, + }, + ); + }, + }, + ); + if (sampleData.rows.length === 0) { + fallbackReason = 'empty_sample'; + this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', { + connectorId: input.connector.id, + table: input.table.name, + }); + } + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + fallbackReason = 'sampling_failed'; + this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'sampling_failed', + message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, error: errorMessage(error) }, + }); + } } try { - const sampleData = await input.connector.sampleTable( - { - connectionId: input.connectionId, - table: tableRef, - limit: 20, - }, - input.context, - ); const prompt = buildKtxTableDescriptionPrompt({ tableName: input.table.name, - sampleData, + ...(fallbackReason === null && sampleData ? { sampleData } : {}), + ...(input.table.columns && input.table.columns.length > 0 ? { columns: input.table.columns } : {}), dataSourceType: input.dataSourceType, rawDescriptions: input.table.rawDescriptions, maxWords: this.settings.tableMaxWords, @@ -394,9 +542,37 @@ export class KtxDescriptionGenerator { if (cacheKey && description) { await this.cache?.set(cacheKey, description); } + if (description && fallbackReason !== null) { + this.onWarning?.({ + code: 'description_fallback_used', + message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, reason: fallbackReason }, + }); + } + if (!description) { + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate description for table ${input.table.name}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id, usedFallback: fallbackReason !== null }, + }); + } return description; } catch (error) { - this.logger?.error(`Error generating table description: ${errorMessage(error)}`); + this.logger?.error(`Error generating table description: ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + }); + this.onWarning?.({ + code: 'enrichment_failed', + message: `Failed to generate description for table ${input.table.name}: ${errorMessage(error)}`, + table: input.table.name, + recoverable: true, + metadata: { connectorId: input.connector.id }, + }); return null; } } @@ -496,33 +672,64 @@ export class KtxDescriptionGenerator { let columnValues = column.sampleValues; if (!columnValues || columnValues.length === 0) { if (!input.connector.sampleColumn) { - this.logger?.warn('KTX scan connector does not support column sampling for column description generation', { + this.logger?.warn('KTX scan connector does not support column sampling; using available metadata only', { connectorId: input.connector.id, table: input.table.name, column: column.name, }); - return { - columnName: column.name, - description: null, - skipped: false, - processed: false, - }; + columnValues = []; + } else { + const sampleColumn = input.connector.sampleColumn; + try { + const sample = await retryAsync( + () => + sampleColumn( + { + connectionId: input.connectionId, + table: tableRef, + column: column.name, + limit: 50, + }, + input.context, + ), + { + attempts: 3, + baseDelayMs: 200, + signal: input.context.signal, + onAttemptFailure: (error, attempt) => { + this.logger?.warn( + `sampleColumn attempt ${attempt} failed for ${input.table.name}.${column.name}: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + attempt, + }, + ); + }, + }, + ); + columnValues = sample.values; + } catch (error) { + if (error instanceof KtxAbortedError) { + throw error; + } + this.logger?.warn( + `sampleColumn exhausted retries for ${input.table.name}.${column.name}; using available metadata only: ${errorMessage(error)}`, + { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + }, + ); + columnValues = []; + } } - - const sample = await input.connector.sampleColumn( - { - connectionId: input.connectionId, - table: tableRef, - column: column.name, - limit: 50, - }, - input.context, - ); - columnValues = sample.values; } const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined); - if (nonNullValues.length === 0) { + const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0; + if (nonNullValues.length === 0 && !hasRawDescriptions) { return { columnName: column.name, description: null, @@ -553,7 +760,14 @@ export class KtxDescriptionGenerator { processed: description !== null, }; } catch (error) { - this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`); + if (error instanceof KtxAbortedError) { + throw error; + } + this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`, { + connectorId: input.connector.id, + table: input.table.name, + column: column.name, + }); return { columnName: column.name, description: null, diff --git a/packages/context/src/scan/local-enrichment.test.ts b/packages/context/src/scan/local-enrichment.test.ts index cbed687d..62a3903d 100644 --- a/packages/context/src/scan/local-enrichment.test.ts +++ b/packages/context/src/scan/local-enrichment.test.ts @@ -404,6 +404,41 @@ describe('local scan enrichment', () => { expect(result.resolvedRelationships).toBeNull(); }); + it('forwards context.logger and emits warnings when sampleTable fails repeatedly', async () => { + const failingConnector: KtxScanConnector = { + ...connector(), + sampleTable: vi.fn(async () => { + throw new Error('pool: ECONNRESET'); + }), + }; + const logger = { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + }; + + const result = await runLocalScanEnrichment({ + connectionId: 'warehouse', + mode: 'enriched', + detectRelationships: false, + connector: failingConnector, + context: { runId: 'scan-run-warnings', logger }, + providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 6 }), + }); + + const codes = result.warnings.map((warning) => warning.code); + expect(codes).toContain('sampling_failed'); + expect(codes).toContain('description_fallback_used'); + expect(result.warnings.some((warning) => warning.table === 'customers')).toBe(true); + expect(logger.warn).toHaveBeenCalled(); + expect(logger.error).toHaveBeenCalled(); + // Each of the two tables produced sampling_failed + description_fallback_used, so 2 + 2 = 4 warnings minimum. + expect(result.warnings.length).toBeGreaterThanOrEqual(4); + // Sampling was retried 3× for each of the 2 tables = 6 calls + expect(failingConnector.sampleTable).toHaveBeenCalledTimes(6); + }); + it('runs configured deterministic enrichment with descriptions and embeddings', async () => { const result = await runLocalScanEnrichment({ connectionId: 'warehouse', diff --git a/packages/context/src/scan/local-enrichment.ts b/packages/context/src/scan/local-enrichment.ts index 5d58e189..c820b8a7 100644 --- a/packages/context/src/scan/local-enrichment.ts +++ b/packages/context/src/scan/local-enrichment.ts @@ -298,6 +298,18 @@ function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable { }; } +function tableMetadataColumns(table: KtxSchemaTable): Array<{ + name: string; + nativeType?: string | null; + comment?: string | null; +}> { + return table.columns.map((column) => ({ + name: column.name, + nativeType: column.nativeType ?? null, + comment: column.comment ?? null, + })); +} + function embeddingBatchSize(maxBatchSize: number): number { return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100; } @@ -308,9 +320,19 @@ async function generateDescriptions(input: { context: KtxScanContext; providers: KtxLocalScanEnrichmentProviders; progress?: KtxProgressPort; + warnings?: KtxScanWarning[]; }): Promise { + const warningSink = input.warnings; const generator = new KtxDescriptionGenerator({ llmProvider: input.providers.llm, + ...(input.context.logger ? { logger: input.context.logger } : {}), + ...(warningSink + ? { + onWarning: (warning: KtxScanWarning) => { + warningSink.push(warning); + }, + } + : {}), settings: { columnMaxWords: 16, tableMaxWords: 24, @@ -355,6 +377,7 @@ async function generateDescriptions(input: { db: table.db, name: table.name, rawDescriptions: table.comment ? { db: table.comment } : {}, + columns: tableMetadataColumns(table), }, }); return { @@ -560,6 +583,7 @@ export async function runLocalScanEnrichment( context: input.context, providers, progress: descriptionProgress, + warnings, }), }); const embeddingProgress = progress?.startPhase(0.2); diff --git a/packages/context/src/scan/relationship-llm-proposal.test.ts b/packages/context/src/scan/relationship-llm-proposal.test.ts index eb05638e..ed43ff02 100644 --- a/packages/context/src/scan/relationship-llm-proposal.test.ts +++ b/packages/context/src/scan/relationship-llm-proposal.test.ts @@ -166,11 +166,11 @@ describe('relationship LLM proposals', () => { }); expect(generateText).toHaveBeenCalledWith( expect.objectContaining({ + system: expect.objectContaining({ + role: 'system', + content: expect.stringContaining('You are helping KTX review possible SQL relationships'), + }), messages: expect.arrayContaining([ - expect.objectContaining({ - role: 'system', - content: expect.stringContaining('You are helping KTX review possible SQL relationships'), - }), expect.objectContaining({ role: 'user', content: expect.stringContaining('"tables"'), @@ -178,9 +178,12 @@ describe('relationship LLM proposals', () => { ]), }), ); - const call = (generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]>)[0]?.[0]; + const call = ( + generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]> + )[0]?.[0]; const userMessage = call?.messages.find((m) => m.role === 'user'); expect(userMessage?.content).not.toContain('You are helping KTX review possible SQL relationships'); + expect(call?.messages.some((m) => m.role === 'system')).toBe(false); }); it('skips deterministic providers without calling generateText', async () => { diff --git a/packages/context/src/scan/types.ts b/packages/context/src/scan/types.ts index c21d21bf..bc8959f5 100644 --- a/packages/context/src/scan/types.ts +++ b/packages/context/src/scan/types.ts @@ -345,7 +345,8 @@ export type KtxScanWarningCode = | 'relationship_llm_invalid_reference' | 'relationship_llm_proposal_failed' | 'credential_redacted' - | 'enrichment_failed'; + | 'enrichment_failed' + | 'description_fallback_used'; export interface KtxScanWarning { code: KtxScanWarningCode; diff --git a/packages/llm/src/index.ts b/packages/llm/src/index.ts index 164ba183..ab2ad341 100644 --- a/packages/llm/src/index.ts +++ b/packages/llm/src/index.ts @@ -1,6 +1,7 @@ export { createKtxEmbeddingProvider } from './embedding-provider.js'; export { runKtxEmbeddingHealthCheck } from './embedding-health.js'; -export { KtxMessageBuilder } from './message-builder.js'; +export { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js'; +export type { KtxSplitSystemMessagesResult } from './message-builder.js'; export type { KtxEmbeddingHealthCheckOptions, KtxEmbeddingHealthCheckResult } from './embedding-health.js'; export type { KtxEmbeddingProviderDeps } from './embedding-provider.js'; export type { KtxLlmHealthCheckDeps, KtxLlmHealthCheckOptions, KtxLlmHealthCheckResult } from './model-health.js'; diff --git a/packages/llm/src/message-builder.test.ts b/packages/llm/src/message-builder.test.ts index bc13a7e1..60f7d948 100644 --- a/packages/llm/src/message-builder.test.ts +++ b/packages/llm/src/message-builder.test.ts @@ -1,6 +1,6 @@ import type { ModelMessage } from 'ai'; import { describe, expect, it } from 'vitest'; -import { KtxMessageBuilder } from './message-builder.js'; +import { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js'; import { createKtxLlmProvider } from './model-provider.js'; function makeBuilder(overrides: Parameters[0]['promptCaching'] = {}) { @@ -111,3 +111,36 @@ describe('KtxMessageBuilder.build', () => { expect((out.tools.z as { providerOptions: any }).providerOptions.anthropic.cacheControl.ttl).toBe('5m'); }); }); + +describe('splitKtxSystemMessages', () => { + it('returns undefined system when no system messages are present', () => { + const split = splitKtxSystemMessages([ + { role: 'user', content: 'hello' }, + { role: 'assistant', content: 'hi' }, + ]); + expect(split.system).toBeUndefined(); + expect(split.messages).toHaveLength(2); + }); + + it('returns a single system message object when one system message is present, preserving providerOptions', () => { + const systemMessage = { + role: 'system' as const, + content: 'You are helpful.', + providerOptions: { anthropic: { cacheControl: { type: 'ephemeral' } } }, + }; + const split = splitKtxSystemMessages([systemMessage, { role: 'user', content: 'hello' }]); + expect(split.system).toBe(systemMessage); + expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]); + }); + + it('returns an array of system messages when multiple are present, in order', () => { + const split = splitKtxSystemMessages([ + { role: 'system', content: 'cached' }, + { role: 'system', content: 'fresh' }, + { role: 'user', content: 'hello' }, + ]); + expect(Array.isArray(split.system)).toBe(true); + expect(split.system).toHaveLength(2); + expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]); + }); +}); diff --git a/packages/llm/src/message-builder.ts b/packages/llm/src/message-builder.ts index a98a0375..f3336f3c 100644 --- a/packages/llm/src/message-builder.ts +++ b/packages/llm/src/message-builder.ts @@ -1,7 +1,29 @@ -import type { LanguageModel, ModelMessage, ToolSet } from 'ai'; +import type { LanguageModel, ModelMessage, SystemModelMessage, ToolSet } from 'ai'; import { isAnthropicProtocolModel } from './model-provider.js'; import type { KtxLlmProvider, KtxPromptCacheTtl, KtxPromptParts } from './types.js'; +export interface KtxSplitSystemMessagesResult { + system: SystemModelMessage | SystemModelMessage[] | undefined; + messages: ModelMessage[]; +} + +export function splitKtxSystemMessages(messages: readonly ModelMessage[]): KtxSplitSystemMessagesResult { + const systemMessages: SystemModelMessage[] = []; + const otherMessages: ModelMessage[] = []; + for (const message of messages) { + if (message.role === 'system') { + systemMessages.push(message); + } else { + otherMessages.push(message); + } + } + return { + system: + systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages, + messages: otherMessages, + }; +} + type ToolMap = ToolSet | Record>; interface KtxMessageBuilderOptions {