fix(context): make scan description generation resilient and quiet

A transient sampleTable failure during ingest used to take out every
table in a connection: generateTableDescription returned a hardcoded
'Table not found' string into descriptions.ai, and KtxDescriptionGenerator
was constructed without a logger, so the failure left no trail anywhere.

- sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff,
  honouring KtxScanContext.signal via a new KtxAbortedError.
- On retry exhaustion or missing capability, table generation falls back
  to a metadata-only prompt built from column name / native type / comment
  / rawDescriptions. The column path follows the same rule -- call the
  LLM when any of samples or rawDescriptions are available; skip only
  when both are absent.
- Logger is now threaded from KtxScanContext into the generator. Failures
  emit structured KtxScanWarning entries (new description_fallback_used
  code, plus existing sampling_failed / enrichment_failed /
  connector_capability_missing). ktx scan groups warnings by code so a
  batch of identical failures collapses to one summary line plus sample.
- Returns null on failure instead of the 'Table not found' sentinel; the
  manifest writer's existing guard already skips empty descriptions, so
  schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS
  already strips stale 'ai' on merge, so existing YAML clears on next run.

Also suppress AI SDK v6 'system in messages' warning: pull system messages
out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages
helper and pass them top-level to generateText (preserves cacheControl
providerOptions on the SystemModelMessage). Agent-runner's local
splitSystemPromptMessages dedupes onto the shared helper.
This commit is contained in:
Andrey Avtomonov 2026-05-15 01:33:50 +02:00
parent f561bfa850
commit 81b67a96cb
14 changed files with 761 additions and 82 deletions

View file

@ -133,6 +133,50 @@ function warningLine(warning: KtxScanWarning): string {
return `${warning.code}: ${location}${warning.message}`;
}
function groupWarningsByCode(warnings: readonly KtxScanWarning[]): Map<string, KtxScanWarning[]> {
const groups = new Map<string, KtxScanWarning[]>();
for (const warning of warnings) {
const list = groups.get(warning.code);
if (list) {
list.push(warning);
} else {
groups.set(warning.code, [warning]);
}
}
return groups;
}
function describeWarningGroup(code: string, count: number): string {
switch (code) {
case 'sampling_failed':
return `${count} ${plural(count, 'table')} could not be sampled (retries exhausted); descriptions used metadata-only fallback or were skipped.`;
case 'description_fallback_used':
return `${count} ${plural(count, 'table')} got an AI description from column metadata only (no sample rows available).`;
case 'enrichment_failed':
return `${count} ${plural(count, 'table/column')} could not be enriched.`;
case 'connector_capability_missing':
return `${count} ${plural(count, 'table')} affected by missing connector capability.`;
case 'statistics_failed':
return `${count} statistics ${plural(count, 'lookup')} failed.`;
case 'llm_unavailable':
return 'LLM provider unavailable; AI enrichment was skipped.';
case 'embedding_unavailable':
return 'Embedding provider unavailable; embeddings were skipped.';
case 'relationship_validation_failed':
return `${count} relationship ${plural(count, 'validation')} could not run.`;
case 'relationship_llm_invalid_reference':
return `${count} LLM-proposed ${plural(count, 'relationship')} referenced unknown columns.`;
case 'relationship_llm_proposal_failed':
return `${count} LLM relationship ${plural(count, 'proposal')} failed.`;
case 'scan_enrichment_backend_not_configured':
return 'Scan enrichment backend is not configured; AI stages were skipped.';
case 'credential_redacted':
return `${count} ${plural(count, 'credential')} were redacted from scan output.`;
default:
return `${count} ${plural(count, 'warning')} (${code})`;
}
}
function managedDaemonOptionsForScanRun(args: Extract<KtxScanArgs, { command: 'run' }>, io: KtxCliIo) {
if (args.databaseIntrospectionUrl || !args.cliVersion || !args.runtimeInstallPolicy) {
return undefined;
@ -153,11 +197,26 @@ function writeNeedsAttention(report: KtxScanReport, io: KtxCliIo): void {
}
if (report.warnings.length > 0) {
io.stdout.write(` ${report.warnings.length} ${plural(report.warnings.length, 'warning')}\n`);
for (const warning of report.warnings.slice(0, 5)) {
io.stdout.write(` - ${warningLine(warning)}\n`);
}
if (report.warnings.length > 5) {
io.stdout.write(` - ${report.warnings.length - 5} more warnings in the JSON report\n`);
const groups = groupWarningsByCode(report.warnings);
for (const [code, warnings] of groups) {
io.stdout.write(` - ${describeWarningGroup(code, warnings.length)}\n`);
const first = warnings[0];
if (first) {
io.stdout.write(` ${warningLine(first)}\n`);
}
if (warnings.length > 1) {
const moreTables = warnings
.slice(1)
.map((warning) =>
warning.table ? (warning.column ? `${warning.table}.${warning.column}` : warning.table) : null,
)
.filter((value): value is string => value !== null)
.slice(0, 3);
if (moreTables.length > 0) {
const suffix = warnings.length - 1 > moreTables.length ? `, …` : '';
io.stdout.write(` also: ${moreTables.join(', ')}${suffix}\n`);
}
}
}
}
if (report.capabilityGaps.length > 0) {

View file

@ -1,4 +1,4 @@
import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm';
import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm';
import { generateText, stepCountIs, type TelemetrySettings, type Tool } from 'ai';
import { noopLogger, type KtxLogger } from '../core/index.js';
import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from '../llm/index.js';
@ -36,14 +36,6 @@ export interface AgentRunnerServiceDeps {
logger?: KtxLogger;
}
function splitSystemPromptMessages(messages: ReturnType<KtxMessageBuilder['wrapSimple']>['messages']) {
const systemMessages = messages.filter((message) => message.role === 'system');
return {
system: systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: messages.filter((message) => message.role !== 'system'),
};
}
export class AgentRunnerService {
private readonly logger: KtxLogger;
@ -62,7 +54,7 @@ export class AgentRunnerService {
tools: params.toolSet,
model,
});
const promptMessages = splitSystemPromptMessages(built.messages);
const promptMessages = splitKtxSystemMessages(built.messages);
await this.deps.debugRequestRecorder?.record(
summarizeKtxLlmDebugRequest({

View file

@ -227,9 +227,10 @@ describe('PageTriageService', () => {
});
generateTextMock
.mockImplementationOnce((args: any) => {
const systemMessage = args.messages.find((m: { role: string }) => m.role === 'system');
const systemMessage = args.system ?? args.messages.find((m: { role: string }) => m.role === 'system');
const userMessage = args.messages.find((m: { role: string }) => m.role === 'user');
const systemText = systemMessage.content as string;
const systemText =
typeof systemMessage === 'string' ? systemMessage : (systemMessage.content as string);
const userText = userMessage.content as string;
expect(systemText).toContain(
'Reusable templates and scripts are durable knowledge regardless of subject matter.',

View file

@ -1,7 +1,7 @@
import { createHash } from 'node:crypto';
import { readdir, readFile } from 'node:fs/promises';
import { dirname, join, relative } from 'node:path';
import { KtxMessageBuilder, type KtxLlmProvider } from '@ktx/llm';
import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider } from '@ktx/llm';
import { generateText, type ToolSet } from 'ai';
import pLimit from 'p-limit';
import { z } from 'zod';
@ -346,10 +346,12 @@ export class PageTriageService {
tools: {},
model,
});
const split = splitKtxSystemMessages(built.messages);
const result = await this.runGenerateText({
model,
temperature: 0,
messages: built.messages,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
});
return result.text;

View file

@ -1,4 +1,4 @@
import { KtxMessageBuilder, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm';
import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider, type KtxModelRole } from '@ktx/llm';
import { generateText, Output, type FlexibleSchema, type ToolSet } from 'ai';
type GenerateTextInput = Parameters<typeof generateText>[0];
@ -29,10 +29,12 @@ export async function generateKtxText(input: GenerateKtxTextInput): Promise<stri
tools: input.tools ?? {},
model,
});
const split = splitKtxSystemMessages(built.messages);
const result = await (input.generateText ?? generateText)({
model,
temperature: input.temperature ?? 0,
messages: built.messages,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
...(hasTools(built.tools as ToolSet)
? {
@ -58,10 +60,12 @@ export async function generateKtxObject<TOutput, TSchema>(
tools: input.tools ?? {},
model,
});
const split = splitKtxSystemMessages(built.messages);
const result = await (input.generateText ?? generateText)({
model,
temperature: input.temperature ?? 0,
messages: built.messages,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
...(hasTools(built.tools as ToolSet)
? {

View file

@ -203,11 +203,11 @@ describe('KtxDescriptionGenerator', () => {
expect(generateText).toHaveBeenCalledWith(
expect.objectContaining({
temperature: 0.2,
system: expect.objectContaining({
role: 'system',
content: expect.stringContaining('Please provide a concise description in 12 words or less.'),
}),
messages: expect.arrayContaining([
expect.objectContaining({
role: 'system',
content: expect.stringContaining('Please provide a concise description in 12 words or less.'),
}),
expect.objectContaining({
role: 'user',
content: expect.stringContaining('<column_name> status </column_name>'),
@ -215,6 +215,8 @@ describe('KtxDescriptionGenerator', () => {
]),
}),
);
const lastCall = vi.mocked(generateText).mock.calls.at(-1)?.[0];
expect(lastCall?.messages?.some((message) => message.role === 'system')).toBe(false);
});
it('samples through the connector when column values are not pre-fetched', async () => {
@ -391,3 +393,289 @@ describe('KtxDescriptionGenerator', () => {
expect(cache.set).toHaveBeenCalledWith('__connection:Warehouse', 'Commerce orders');
});
});
describe('KtxDescriptionGenerator resilience', () => {
function createLogger() {
return {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
}
it('retries sampleTable on transient failure and uses sampled rows when it eventually succeeds', async () => {
const sampleTable = vi
.fn<NonNullable<KtxScanConnector['sampleTable']>>()
.mockRejectedValueOnce(new Error('pool: transient ECONNRESET'))
.mockRejectedValueOnce(new Error('pool: transient ECONNRESET'))
.mockResolvedValue({
headers: ['id', 'status'],
rows: [
[1, 'paid'],
[2, 'refunded'],
],
totalRows: 2,
});
const connector: KtxScanConnector = {
...createConnector(),
sampleTable,
};
const logger = createLogger();
const warnings: Array<{ code: string; table?: string }> = [];
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('Commerce orders'),
logger,
onWarning: (warning) => warnings.push({ code: warning.code, ...(warning.table ? { table: warning.table } : {}) }),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 },
});
const description = await generator.generateTableDescription({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
table: { catalog: null, db: 'public', name: 'orders' },
});
expect(description).toBe('Commerce orders');
expect(sampleTable).toHaveBeenCalledTimes(3);
expect(logger.warn).toHaveBeenCalledTimes(2);
expect(warnings).toEqual([]);
});
it('falls back to metadata-only prompt when sampleTable retries exhaust', async () => {
const sampleTable = vi
.fn<NonNullable<KtxScanConnector['sampleTable']>>()
.mockRejectedValue(new Error('pool: connection refused'));
const connector: KtxScanConnector = {
...createConnector(),
sampleTable,
};
const logger = createLogger();
const warnings: Array<{ code: string; table?: string; metadata?: Record<string, unknown> }> = [];
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('Customer reference data'),
logger,
onWarning: (warning) =>
warnings.push({
code: warning.code,
...(warning.table ? { table: warning.table } : {}),
...(warning.metadata ? { metadata: warning.metadata } : {}),
}),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24, concurrencyLimit: 2 },
});
const description = await generator.generateTableDescription({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
table: {
catalog: null,
db: 'public',
name: 'customers',
columns: [
{ name: 'id', nativeType: 'uuid' },
{ name: 'email', nativeType: 'text', comment: 'Primary contact email' },
],
},
});
expect(description).toBe('Customer reference data');
expect(sampleTable).toHaveBeenCalledTimes(3);
expect(warnings.map((warning) => warning.code)).toEqual(['sampling_failed', 'description_fallback_used']);
expect(warnings[1]?.metadata?.reason).toBe('sampling_failed');
const userPrompt = (vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> })
.messages.find((message) => message.role === 'user')?.content;
expect(userPrompt).toContain('Columns (metadata only, no sample rows)');
expect(userPrompt).toContain('email (text)');
expect(userPrompt).toContain('Primary contact email');
});
it('emits enrichment_failed and returns null when both sampling and metadata-only LLM fail', async () => {
const sampleTable = vi
.fn<NonNullable<KtxScanConnector['sampleTable']>>()
.mockRejectedValue(new Error('pool: connection refused'));
const connector: KtxScanConnector = {
...createConnector(),
sampleTable,
};
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmProvider: createFailingLlmProvider(),
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const description = await generator.generateTableDescription({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
table: { catalog: null, db: 'public', name: 'orphan', columns: [{ name: 'id' }] },
});
expect(description).toBeNull();
expect(warnings).toEqual(['sampling_failed', 'enrichment_failed']);
});
it('uses metadata-only fallback when connector has no sampleTable', async () => {
const connector = createConnector();
const samplerWithoutTable: KtxScanConnector = {
...connector,
sampleTable: undefined,
};
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('Orders mart'),
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const description = await generator.generateTableDescription({
connectionId: 'conn-1',
connector: samplerWithoutTable,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
table: {
catalog: null,
db: 'public',
name: 'mart_orders',
columns: [{ name: 'order_id', nativeType: 'uuid' }],
},
});
expect(description).toBe('Orders mart');
expect(warnings).toEqual(['connector_capability_missing', 'description_fallback_used']);
});
it('aborts retry loop when the scan context signal fires', async () => {
const controller = new AbortController();
const sampleTable = vi.fn<NonNullable<KtxScanConnector['sampleTable']>>().mockImplementation(async () => {
controller.abort();
throw new Error('first attempt blew up');
});
const connector: KtxScanConnector = {
...createConnector(),
sampleTable,
};
const warnings: string[] = [];
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('should not be called'),
onWarning: (warning) => warnings.push(warning.code),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
await expect(
generator.generateTableDescription({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1', signal: controller.signal },
dataSourceType: 'POSTGRESQL',
table: { catalog: null, db: 'public', name: 'orders' },
}),
).rejects.toThrow('aborted');
expect(sampleTable).toHaveBeenCalledTimes(1);
expect(warnings).toEqual([]);
});
it('generates column descriptions from rawDescriptions when sampleColumn is unavailable', async () => {
const samplerWithoutColumn: KtxScanConnector = {
...createConnector(),
sampleColumn: undefined,
};
const logger = createLogger();
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('Payment lifecycle state'),
logger,
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateColumnDescriptions({
connectionId: 'conn-1',
connector: samplerWithoutColumn,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'status', rawDescriptions: { db: 'order lifecycle state' } }],
},
});
expect(result.columnDescriptions).toEqual([['status', 'Payment lifecycle state']]);
expect(logger.warn).toHaveBeenCalled();
const userPrompt = (
vi.mocked(generateText).mock.calls.at(-1)?.[0] as { messages: Array<{ role: string; content: string }> }
).messages.find((message) => message.role === 'user')?.content;
expect(userPrompt).toContain('<sample_values> unavailable </sample_values>');
expect(userPrompt).toContain('<db_documentation> order lifecycle state </db_documentation>');
});
it('generates column descriptions from rawDescriptions when sampleColumn retries exhaust', async () => {
const sampleColumn = vi
.fn<NonNullable<KtxScanConnector['sampleColumn']>>()
.mockRejectedValue(new Error('pool: connection refused'));
const flakyConnector: KtxScanConnector = {
...createConnector(),
sampleColumn,
};
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('Customer reference identifier'),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateColumnDescriptions({
connectionId: 'conn-1',
connector: flakyConnector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'customer_id', rawDescriptions: { db: 'FK to customers.id' } }],
},
});
expect(sampleColumn).toHaveBeenCalledTimes(3);
expect(result.columnDescriptions).toEqual([['customer_id', 'Customer reference identifier']]);
});
it('skips column LLM call only when neither samples nor rawDescriptions are available', async () => {
const sampleColumn = vi
.fn<NonNullable<KtxScanConnector['sampleColumn']>>()
.mockResolvedValue({ values: [null, null], nullCount: 2, distinctCount: 0 });
const connector: KtxScanConnector = {
...createConnector(),
sampleColumn,
};
vi.mocked(generateText).mockClear();
const generator = new KtxDescriptionGenerator({
llmProvider: createLlmProvider('should not be called'),
settings: { columnMaxWords: 12, tableMaxWords: 18, dataSourceMaxWords: 24 },
});
const result = await generator.generateColumnDescriptions({
connectionId: 'conn-1',
connector,
context: { runId: 'run-1' },
dataSourceType: 'POSTGRESQL',
supportsNestedAnalysis: false,
table: {
catalog: null,
db: 'public',
name: 'orders',
columns: [{ name: 'opaque_blob' }],
},
});
expect(result.columnDescriptions).toEqual([['opaque_blob', null]]);
expect(generateText).not.toHaveBeenCalled();
});
});

View file

@ -5,11 +5,18 @@ import type {
KtxColumnSampleResult,
KtxScanContext,
KtxScanLoggerPort,
KtxScanWarning,
KtxTableRef,
KtxTableSampleInput,
KtxTableSampleResult,
} from './types.js';
interface KtxDescriptionTableColumn {
name: string;
nativeType?: string | null;
comment?: string | null;
}
export interface KtxDescriptionCachePort {
buildTableKey(table: KtxTableRef): string;
buildColumnKey(table: KtxTableRef, columnName: string): string;
@ -53,6 +60,7 @@ export interface KtxDescriptionColumnTable extends KtxTableRef {
export interface KtxDescriptionTableInput extends KtxTableRef {
rawDescriptions?: Record<string, string>;
columns?: KtxDescriptionTableColumn[];
}
export interface KtxColumnAnalysisResult {
@ -72,7 +80,8 @@ export interface KtxColumnDescriptionPromptInput {
export interface KtxTableDescriptionPromptInput {
tableName: string;
sampleData: KtxTableSampleResult;
sampleData?: KtxTableSampleResult;
columns?: KtxDescriptionTableColumn[];
dataSourceType: string;
rawDescriptions?: Record<string, string>;
}
@ -114,6 +123,7 @@ export interface KtxDescriptionGeneratorOptions {
llmProvider: KtxLlmProvider;
cache?: KtxDescriptionCachePort;
logger?: KtxScanLoggerPort;
onWarning?: (warning: KtxScanWarning) => void;
settings: KtxDescriptionGenerationSettings;
}
@ -136,6 +146,66 @@ function errorMessage(error: unknown): string {
return error instanceof Error ? error.message : String(error);
}
class KtxAbortedError extends Error {
constructor() {
super('aborted');
this.name = 'KtxAbortedError';
}
}
async function delayWithAbort(ms: number, signal?: AbortSignal): Promise<void> {
if (!signal) {
await new Promise<void>((resolve) => setTimeout(resolve, ms));
return;
}
if (signal.aborted) {
throw new KtxAbortedError();
}
await new Promise<void>((resolve, reject) => {
const timer = setTimeout(() => {
signal.removeEventListener('abort', onAbort);
resolve();
}, ms);
const onAbort = (): void => {
clearTimeout(timer);
reject(new KtxAbortedError());
};
signal.addEventListener('abort', onAbort, { once: true });
});
}
interface RetryAsyncOptions {
attempts: number;
baseDelayMs: number;
signal?: AbortSignal;
onAttemptFailure?: (error: unknown, attempt: number) => void;
}
async function retryAsync<T>(fn: () => Promise<T>, options: RetryAsyncOptions): Promise<T> {
const attempts = Math.max(1, options.attempts);
let lastError: unknown;
for (let attempt = 1; attempt <= attempts; attempt += 1) {
if (options.signal?.aborted) {
throw new KtxAbortedError();
}
try {
return await fn();
} catch (error) {
lastError = error;
if (error instanceof KtxAbortedError) {
throw error;
}
options.onAttemptFailure?.(error, attempt);
if (attempt === attempts) {
break;
}
const delay = options.baseDelayMs * 2 ** (attempt - 1);
await delayWithAbort(delay, options.signal);
}
}
throw lastError;
}
function toTableRef(table: KtxTableRef): KtxTableRef {
return {
catalog: table.catalog,
@ -205,11 +275,12 @@ Example:
systemParts.push(wordLimitLine(input.maxWords));
}
const sampleValuesContent = valuesStr.length > 0 ? valuesStr : 'unavailable';
let user = `<table_context> ${input.tableContext} </table_context>
<column_name> ${input.columnName} </column_name>
<sample_values> ${valuesStr} </sample_values>
<sample_values> ${sampleValuesContent} </sample_values>
`;
const sources = descriptionSources(input.rawDescriptions);
@ -228,16 +299,6 @@ Example:
export function buildKtxTableDescriptionPrompt(
input: KtxTableDescriptionPromptInput & { maxWords?: number },
): KtxDescriptionPrompt {
const columnInfo: string[] = [];
for (let index = 0; index < Math.min(input.sampleData.headers.length, 10); index += 1) {
const header = input.sampleData.headers[index];
const sampleValues = input.sampleData.rows
.slice(0, 3)
.map((row) => row[index])
.filter((value) => value !== null && value !== undefined);
columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`);
}
const systemParts: string[] = [
`Analyze database tables and provide a concise description.
@ -256,9 +317,38 @@ Example: "Information about healthcare professionals used for workforce manageme
systemParts.push(wordLimitLine(input.maxWords));
}
const hasSamples = !!input.sampleData && input.sampleData.rows.length > 0;
let columnsLine: string;
let rowsLine: string;
if (hasSamples) {
const sampleData = input.sampleData!;
const columnInfo: string[] = [];
for (let index = 0; index < Math.min(sampleData.headers.length, 10); index += 1) {
const header = sampleData.headers[index];
const sampleValues = sampleData.rows
.slice(0, 3)
.map((row) => row[index])
.filter((value) => value !== null && value !== undefined);
columnInfo.push(`${header}: ${sampleValues.map((value) => String(value)).join(', ')}`);
}
columnsLine = `Columns and sample data: ${columnInfo.join(' | ')}`;
rowsLine = `Total rows in sample: ${sampleData.rows.length}`;
} else if (input.columns && input.columns.length > 0) {
const columnInfo = input.columns.slice(0, 30).map((column) => {
const typePart = column.nativeType ? ` (${column.nativeType})` : '';
const commentPart = column.comment ? `${column.comment}` : '';
return `${column.name}${typePart}${commentPart}`;
});
columnsLine = `Columns (metadata only, no sample rows): ${columnInfo.join(' | ')}`;
rowsLine = 'Sample rows: unavailable';
} else {
columnsLine = 'Columns: unavailable';
rowsLine = 'Sample rows: unavailable';
}
let user = `Table: ${input.tableName}
Columns and sample data: ${columnInfo.join(' | ')}
Total rows in sample: ${input.sampleData.rows.length}
${columnsLine}
${rowsLine}
Data source type: ${input.dataSourceType}`;
const sources = descriptionSources(input.rawDescriptions);
@ -313,12 +403,14 @@ export class KtxDescriptionGenerator {
private readonly llmProvider: KtxLlmProvider;
private readonly cache?: KtxDescriptionCachePort;
private readonly logger?: KtxScanLoggerPort;
private readonly onWarning?: (warning: KtxScanWarning) => void;
private readonly settings: ResolvedKtxDescriptionGenerationSettings;
constructor(options: KtxDescriptionGeneratorOptions) {
this.llmProvider = options.llmProvider;
this.cache = options.cache;
this.logger = options.logger;
this.onWarning = options.onWarning;
this.settings = {
columnMaxWords: options.settings.columnMaxWords,
tableMaxWords: options.settings.tableMaxWords,
@ -366,26 +458,82 @@ export class KtxDescriptionGenerator {
}
}
if (!input.connector.sampleTable) {
this.logger?.warn('KTX scan connector does not support table sampling for table description generation', {
const sampleTable = input.connector.sampleTable;
let sampleData: KtxTableSampleResult | null = null;
let fallbackReason: 'capability_missing' | 'sampling_failed' | 'empty_sample' | null = null;
if (!sampleTable) {
fallbackReason = 'capability_missing';
this.logger?.warn('KTX scan connector does not support table sampling; falling back to metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
return null;
this.onWarning?.({
code: 'connector_capability_missing',
message: `Connector ${input.connector.id} does not support sampleTable; using metadata-only description prompt`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, capability: 'sampleTable' },
});
} else {
try {
sampleData = await retryAsync(
() =>
sampleTable(
{
connectionId: input.connectionId,
table: tableRef,
limit: 20,
},
input.context,
),
{
attempts: 3,
baseDelayMs: 200,
signal: input.context.signal,
onAttemptFailure: (error, attempt) => {
this.logger?.warn(
`sampleTable attempt ${attempt} failed for ${input.table.name}: ${errorMessage(error)}`,
{
connectorId: input.connector.id,
table: input.table.name,
attempt,
},
);
},
},
);
if (sampleData.rows.length === 0) {
fallbackReason = 'empty_sample';
this.logger?.warn('sampleTable returned no rows; using metadata-only prompt', {
connectorId: input.connector.id,
table: input.table.name,
});
}
} catch (error) {
if (error instanceof KtxAbortedError) {
throw error;
}
fallbackReason = 'sampling_failed';
this.logger?.error(`sampleTable exhausted retries for ${input.table.name}: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'sampling_failed',
message: `Failed to sample table ${input.table.name} after retries: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, error: errorMessage(error) },
});
}
}
try {
const sampleData = await input.connector.sampleTable(
{
connectionId: input.connectionId,
table: tableRef,
limit: 20,
},
input.context,
);
const prompt = buildKtxTableDescriptionPrompt({
tableName: input.table.name,
sampleData,
...(fallbackReason === null && sampleData ? { sampleData } : {}),
...(input.table.columns && input.table.columns.length > 0 ? { columns: input.table.columns } : {}),
dataSourceType: input.dataSourceType,
rawDescriptions: input.table.rawDescriptions,
maxWords: this.settings.tableMaxWords,
@ -394,9 +542,37 @@ export class KtxDescriptionGenerator {
if (cacheKey && description) {
await this.cache?.set(cacheKey, description);
}
if (description && fallbackReason !== null) {
this.onWarning?.({
code: 'description_fallback_used',
message: `Generated table description without sample rows for ${input.table.name} (reason: ${fallbackReason})`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, reason: fallbackReason },
});
}
if (!description) {
this.onWarning?.({
code: 'enrichment_failed',
message: `Failed to generate description for table ${input.table.name}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id, usedFallback: fallbackReason !== null },
});
}
return description;
} catch (error) {
this.logger?.error(`Error generating table description: ${errorMessage(error)}`);
this.logger?.error(`Error generating table description: ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
});
this.onWarning?.({
code: 'enrichment_failed',
message: `Failed to generate description for table ${input.table.name}: ${errorMessage(error)}`,
table: input.table.name,
recoverable: true,
metadata: { connectorId: input.connector.id },
});
return null;
}
}
@ -496,33 +672,64 @@ export class KtxDescriptionGenerator {
let columnValues = column.sampleValues;
if (!columnValues || columnValues.length === 0) {
if (!input.connector.sampleColumn) {
this.logger?.warn('KTX scan connector does not support column sampling for column description generation', {
this.logger?.warn('KTX scan connector does not support column sampling; using available metadata only', {
connectorId: input.connector.id,
table: input.table.name,
column: column.name,
});
return {
columnName: column.name,
description: null,
skipped: false,
processed: false,
};
columnValues = [];
} else {
const sampleColumn = input.connector.sampleColumn;
try {
const sample = await retryAsync(
() =>
sampleColumn(
{
connectionId: input.connectionId,
table: tableRef,
column: column.name,
limit: 50,
},
input.context,
),
{
attempts: 3,
baseDelayMs: 200,
signal: input.context.signal,
onAttemptFailure: (error, attempt) => {
this.logger?.warn(
`sampleColumn attempt ${attempt} failed for ${input.table.name}.${column.name}: ${errorMessage(error)}`,
{
connectorId: input.connector.id,
table: input.table.name,
column: column.name,
attempt,
},
);
},
},
);
columnValues = sample.values;
} catch (error) {
if (error instanceof KtxAbortedError) {
throw error;
}
this.logger?.warn(
`sampleColumn exhausted retries for ${input.table.name}.${column.name}; using available metadata only: ${errorMessage(error)}`,
{
connectorId: input.connector.id,
table: input.table.name,
column: column.name,
},
);
columnValues = [];
}
}
const sample = await input.connector.sampleColumn(
{
connectionId: input.connectionId,
table: tableRef,
column: column.name,
limit: 50,
},
input.context,
);
columnValues = sample.values;
}
const nonNullValues = (columnValues ?? []).filter((value) => value !== null && value !== undefined);
if (nonNullValues.length === 0) {
const hasRawDescriptions = descriptionSources(column.rawDescriptions).length > 0;
if (nonNullValues.length === 0 && !hasRawDescriptions) {
return {
columnName: column.name,
description: null,
@ -553,7 +760,14 @@ export class KtxDescriptionGenerator {
processed: description !== null,
};
} catch (error) {
this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`);
if (error instanceof KtxAbortedError) {
throw error;
}
this.logger?.error(`Error analyzing column '${column.name}': ${errorMessage(error)}`, {
connectorId: input.connector.id,
table: input.table.name,
column: column.name,
});
return {
columnName: column.name,
description: null,

View file

@ -404,6 +404,41 @@ describe('local scan enrichment', () => {
expect(result.resolvedRelationships).toBeNull();
});
it('forwards context.logger and emits warnings when sampleTable fails repeatedly', async () => {
const failingConnector: KtxScanConnector = {
...connector(),
sampleTable: vi.fn(async () => {
throw new Error('pool: ECONNRESET');
}),
};
const logger = {
debug: vi.fn(),
info: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',
mode: 'enriched',
detectRelationships: false,
connector: failingConnector,
context: { runId: 'scan-run-warnings', logger },
providers: createDeterministicLocalScanEnrichmentProviders({ embeddingDimensions: 6 }),
});
const codes = result.warnings.map((warning) => warning.code);
expect(codes).toContain('sampling_failed');
expect(codes).toContain('description_fallback_used');
expect(result.warnings.some((warning) => warning.table === 'customers')).toBe(true);
expect(logger.warn).toHaveBeenCalled();
expect(logger.error).toHaveBeenCalled();
// Each of the two tables produced sampling_failed + description_fallback_used, so 2 + 2 = 4 warnings minimum.
expect(result.warnings.length).toBeGreaterThanOrEqual(4);
// Sampling was retried 3× for each of the 2 tables = 6 calls
expect(failingConnector.sampleTable).toHaveBeenCalledTimes(6);
});
it('runs configured deterministic enrichment with descriptions and embeddings', async () => {
const result = await runLocalScanEnrichment({
connectionId: 'warehouse',

View file

@ -298,6 +298,18 @@ function descriptionTable(table: KtxSchemaTable): KtxDescriptionColumnTable {
};
}
function tableMetadataColumns(table: KtxSchemaTable): Array<{
name: string;
nativeType?: string | null;
comment?: string | null;
}> {
return table.columns.map((column) => ({
name: column.name,
nativeType: column.nativeType ?? null,
comment: column.comment ?? null,
}));
}
function embeddingBatchSize(maxBatchSize: number): number {
return Number.isInteger(maxBatchSize) && maxBatchSize > 0 ? maxBatchSize : 100;
}
@ -308,9 +320,19 @@ async function generateDescriptions(input: {
context: KtxScanContext;
providers: KtxLocalScanEnrichmentProviders;
progress?: KtxProgressPort;
warnings?: KtxScanWarning[];
}): Promise<KtxLocalScanEnrichmentResult['descriptionUpdates']> {
const warningSink = input.warnings;
const generator = new KtxDescriptionGenerator({
llmProvider: input.providers.llm,
...(input.context.logger ? { logger: input.context.logger } : {}),
...(warningSink
? {
onWarning: (warning: KtxScanWarning) => {
warningSink.push(warning);
},
}
: {}),
settings: {
columnMaxWords: 16,
tableMaxWords: 24,
@ -355,6 +377,7 @@ async function generateDescriptions(input: {
db: table.db,
name: table.name,
rawDescriptions: table.comment ? { db: table.comment } : {},
columns: tableMetadataColumns(table),
},
});
return {
@ -560,6 +583,7 @@ export async function runLocalScanEnrichment(
context: input.context,
providers,
progress: descriptionProgress,
warnings,
}),
});
const embeddingProgress = progress?.startPhase(0.2);

View file

@ -166,11 +166,11 @@ describe('relationship LLM proposals', () => {
});
expect(generateText).toHaveBeenCalledWith(
expect.objectContaining({
system: expect.objectContaining({
role: 'system',
content: expect.stringContaining('You are helping KTX review possible SQL relationships'),
}),
messages: expect.arrayContaining([
expect.objectContaining({
role: 'system',
content: expect.stringContaining('You are helping KTX review possible SQL relationships'),
}),
expect.objectContaining({
role: 'user',
content: expect.stringContaining('"tables"'),
@ -178,9 +178,12 @@ describe('relationship LLM proposals', () => {
]),
}),
);
const call = (generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]>)[0]?.[0];
const call = (
generateText.mock.calls as unknown as Array<[{ messages: Array<{ role: string; content: string }> }]>
)[0]?.[0];
const userMessage = call?.messages.find((m) => m.role === 'user');
expect(userMessage?.content).not.toContain('You are helping KTX review possible SQL relationships');
expect(call?.messages.some((m) => m.role === 'system')).toBe(false);
});
it('skips deterministic providers without calling generateText', async () => {

View file

@ -345,7 +345,8 @@ export type KtxScanWarningCode =
| 'relationship_llm_invalid_reference'
| 'relationship_llm_proposal_failed'
| 'credential_redacted'
| 'enrichment_failed';
| 'enrichment_failed'
| 'description_fallback_used';
export interface KtxScanWarning {
code: KtxScanWarningCode;

View file

@ -1,6 +1,7 @@
export { createKtxEmbeddingProvider } from './embedding-provider.js';
export { runKtxEmbeddingHealthCheck } from './embedding-health.js';
export { KtxMessageBuilder } from './message-builder.js';
export { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js';
export type { KtxSplitSystemMessagesResult } from './message-builder.js';
export type { KtxEmbeddingHealthCheckOptions, KtxEmbeddingHealthCheckResult } from './embedding-health.js';
export type { KtxEmbeddingProviderDeps } from './embedding-provider.js';
export type { KtxLlmHealthCheckDeps, KtxLlmHealthCheckOptions, KtxLlmHealthCheckResult } from './model-health.js';

View file

@ -1,6 +1,6 @@
import type { ModelMessage } from 'ai';
import { describe, expect, it } from 'vitest';
import { KtxMessageBuilder } from './message-builder.js';
import { KtxMessageBuilder, splitKtxSystemMessages } from './message-builder.js';
import { createKtxLlmProvider } from './model-provider.js';
function makeBuilder(overrides: Parameters<typeof createKtxLlmProvider>[0]['promptCaching'] = {}) {
@ -111,3 +111,36 @@ describe('KtxMessageBuilder.build', () => {
expect((out.tools.z as { providerOptions: any }).providerOptions.anthropic.cacheControl.ttl).toBe('5m');
});
});
describe('splitKtxSystemMessages', () => {
it('returns undefined system when no system messages are present', () => {
const split = splitKtxSystemMessages([
{ role: 'user', content: 'hello' },
{ role: 'assistant', content: 'hi' },
]);
expect(split.system).toBeUndefined();
expect(split.messages).toHaveLength(2);
});
it('returns a single system message object when one system message is present, preserving providerOptions', () => {
const systemMessage = {
role: 'system' as const,
content: 'You are helpful.',
providerOptions: { anthropic: { cacheControl: { type: 'ephemeral' } } },
};
const split = splitKtxSystemMessages([systemMessage, { role: 'user', content: 'hello' }]);
expect(split.system).toBe(systemMessage);
expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]);
});
it('returns an array of system messages when multiple are present, in order', () => {
const split = splitKtxSystemMessages([
{ role: 'system', content: 'cached' },
{ role: 'system', content: 'fresh' },
{ role: 'user', content: 'hello' },
]);
expect(Array.isArray(split.system)).toBe(true);
expect(split.system).toHaveLength(2);
expect(split.messages).toEqual([{ role: 'user', content: 'hello' }]);
});
});

View file

@ -1,7 +1,29 @@
import type { LanguageModel, ModelMessage, ToolSet } from 'ai';
import type { LanguageModel, ModelMessage, SystemModelMessage, ToolSet } from 'ai';
import { isAnthropicProtocolModel } from './model-provider.js';
import type { KtxLlmProvider, KtxPromptCacheTtl, KtxPromptParts } from './types.js';
export interface KtxSplitSystemMessagesResult {
system: SystemModelMessage | SystemModelMessage[] | undefined;
messages: ModelMessage[];
}
export function splitKtxSystemMessages(messages: readonly ModelMessage[]): KtxSplitSystemMessagesResult {
const systemMessages: SystemModelMessage[] = [];
const otherMessages: ModelMessage[] = [];
for (const message of messages) {
if (message.role === 'system') {
systemMessages.push(message);
} else {
otherMessages.push(message);
}
}
return {
system:
systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: otherMessages,
};
}
type ToolMap = ToolSet | Record<string, Record<string, unknown>>;
interface KtxMessageBuilderOptions {