ktx/packages/llm/src/message-builder.ts
Andrey Avtomonov cb8902f1e5
fix(context): merge overlay columns onto manifest columns by name (#94)
* fix(context): merge overlay columns onto manifest columns by name

composeOverlay was appending overlay columns to the manifest column list,
producing duplicate entries when dbt/metabase overlays declared a column
just to attach descriptions. The duplicates carried no `type`, so the
pydantic SourceDefinition rejected them at semantic-query time and broke
`ktx sl query` for every overlay-backed measure. Now overlay columns
match base columns by name (case-insensitive): same-name entries merge
onto the manifest (overlay fields win, type/role fall back to the base,
descriptions merge per source key) and only new names append.

* refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract

Overlay sources now have two distinct collections: `columns:` for computed
columns (requiring `expr` + `type`) and `column_overrides:` for metadata
patches to inherited manifest columns. Composing or loading an overlay that
mixes the two — or references an unknown column — fails with a typed error.

Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` /
`toResolvedWire` as the strict shape sent to the Python engine, and add a
schema contract test that diffs Zod against the Pydantic JSON schema dumped
by `python -m semantic_layer dump-schema`. `SourceDefinition` is now
`extra="forbid"` on the Python side.

`loadAllSources` surfaces per-file load errors instead of swallowing them,
so validation/query paths can report manifest shard parse failures.

* fix(context): make scan description generation resilient and quiet

A transient sampleTable failure during ingest used to take out every
table in a connection: generateTableDescription returned a hardcoded
'Table not found' string into descriptions.ai, and KtxDescriptionGenerator
was constructed without a logger, so the failure left no trail anywhere.

- sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff,
  honouring KtxScanContext.signal via a new KtxAbortedError.
- On retry exhaustion or missing capability, table generation falls back
  to a metadata-only prompt built from column name / native type / comment
  / rawDescriptions. The column path follows the same rule -- call the
  LLM when any of samples or rawDescriptions are available; skip only
  when both are absent.
- Logger is now threaded from KtxScanContext into the generator. Failures
  emit structured KtxScanWarning entries (new description_fallback_used
  code, plus existing sampling_failed / enrichment_failed /
  connector_capability_missing). ktx scan groups warnings by code so a
  batch of identical failures collapses to one summary line plus sample.
- Returns null on failure instead of the 'Table not found' sentinel; the
  manifest writer's existing guard already skips empty descriptions, so
  schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS
  already strips stale 'ai' on merge, so existing YAML clears on next run.

Also suppress AI SDK v6 'system in messages' warning: pull system messages
out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages
helper and pass them top-level to generateText (preserves cacheControl
providerOptions on the SystemModelMessage). Agent-runner's local
splitSystemPromptMessages dedupes onto the shared helper.

* test(docs): align examples-docs assertions with revamped docs

PR #103 (setup/guide doc revamp) reworded several CLI examples and
connection labels; the assertions in scripts/examples-docs.test.mjs
still referenced the pre-revamp wording and were failing in CI on main.
Update the regexes to match the post-revamp content:

- drop the `--json` flag from the sl-query example expectation
- move the `Driver:` / `Status: ok` probe to the connection reference,
  which is where that output now lives (driver id is lowercase
  `postgres`, not the display name `PostgreSQL`)
- drop the obsolete `Install \`uv\`...` troubleshooting line
- accept `<connectionId>` everywhere; the docs no longer use the
  hyphenated `<connection-id>` form
- match the `warehouse` connection id used in the quickstart instead of
  the `postgres-warehouse` id only used in the README and setup ref

* fix(sl): skip TS/Python schema contract test when uv is unavailable

The TypeScript checks CI job does not install uv or Python, so the
module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw
ENOENT and failed the suite. Wrap the schema dump in a try/catch and
guard the describe block with `describe.skipIf` so the test skips in
environments without uv. Local dev and any CI job that has uv on PATH
still runs the cross-language contract assertion.
2026-05-15 02:11:04 +02:00

219 lines
7 KiB
TypeScript

import type { LanguageModel, ModelMessage, SystemModelMessage, ToolSet } from 'ai';
import { isAnthropicProtocolModel } from './model-provider.js';
import type { KtxLlmProvider, KtxPromptCacheTtl, KtxPromptParts } from './types.js';
export interface KtxSplitSystemMessagesResult {
system: SystemModelMessage | SystemModelMessage[] | undefined;
messages: ModelMessage[];
}
export function splitKtxSystemMessages(messages: readonly ModelMessage[]): KtxSplitSystemMessagesResult {
const systemMessages: SystemModelMessage[] = [];
const otherMessages: ModelMessage[] = [];
for (const message of messages) {
if (message.role === 'system') {
systemMessages.push(message);
} else {
otherMessages.push(message);
}
}
return {
system:
systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: otherMessages,
};
}
type ToolMap = ToolSet | Record<string, Record<string, unknown>>;
interface KtxMessageBuilderOptions {
cacheSystem?: boolean;
cacheTools?: boolean;
cacheLastHistory?: boolean;
}
interface KtxBuildInput {
parts: KtxPromptParts;
history: ModelMessage[];
currentMessage: ModelMessage;
tools: ToolMap;
model: LanguageModel | string;
}
interface KtxWrapSimpleInput {
system?: string;
messages?: ModelMessage[];
tools?: ToolMap;
model: LanguageModel | string;
}
interface KtxBuildOutput {
messages: ModelMessage[];
tools: ToolMap;
}
export class KtxMessageBuilder {
constructor(
private readonly provider: KtxLlmProvider,
private readonly options: KtxMessageBuilderOptions = {},
) {}
build(input: KtxBuildInput): KtxBuildOutput {
const cfg = this.provider.promptCachingConfig();
const cachingActive = cfg.enabled && isAnthropicProtocolModel(input.model);
const ttls = this.resolveTtls(input.model);
const messages: ModelMessage[] = [];
const systemMessage: ModelMessage & { providerOptions?: unknown } = {
role: 'system',
content: input.parts.staticSystem,
};
if (cachingActive && this.cacheSystemEnabled()) {
systemMessage.providerOptions = this.provider.cacheMarker(ttls.systemTtl, input.model);
}
messages.push(systemMessage);
if (input.parts.dynamicSystem) {
messages.push({ role: 'system', content: input.parts.dynamicSystem });
}
const historyToEmit =
cachingActive && this.cacheHistoryEnabled()
? this.markLastHistoryMessage(input.history, ttls.historyTtl, input.model)
: input.history;
messages.push(...historyToEmit);
messages.push(this.wrapLeading(input.currentMessage, input.parts.leadingUserContext));
return {
messages,
tools: this.sortAndMarkTools(input.tools, cachingActive, this.cacheToolsEnabled(), ttls.toolsTtl, input.model),
};
}
wrapSimple(input: KtxWrapSimpleInput): KtxBuildOutput {
const cfg = this.provider.promptCachingConfig();
const cachingActive = cfg.enabled && isAnthropicProtocolModel(input.model);
const ttls = this.resolveTtls(input.model);
const messages: ModelMessage[] = [];
if (input.system) {
const systemMessage: ModelMessage & { providerOptions?: unknown } = {
role: 'system',
content: input.system,
};
if (cachingActive && this.cacheSystemEnabled()) {
systemMessage.providerOptions = this.provider.cacheMarker(ttls.systemTtl, input.model);
}
messages.push(systemMessage);
}
if (input.messages) {
messages.push(
...(cachingActive && this.cacheHistoryEnabled()
? this.markLastHistoryMessage(input.messages, ttls.historyTtl, input.model)
: input.messages),
);
}
return {
messages,
tools: this.sortAndMarkTools(input.tools ?? {}, cachingActive, this.cacheToolsEnabled(), ttls.toolsTtl, input.model),
};
}
private cacheSystemEnabled(): boolean {
return this.options.cacheSystem ?? this.provider.promptCachingConfig().cacheSystem;
}
private cacheToolsEnabled(): boolean {
return this.options.cacheTools ?? this.provider.promptCachingConfig().cacheTools;
}
private cacheHistoryEnabled(): boolean {
return this.options.cacheLastHistory ?? this.provider.promptCachingConfig().cacheHistory;
}
private resolveTtls(model: LanguageModel | string): {
systemTtl: KtxPromptCacheTtl;
toolsTtl: KtxPromptCacheTtl;
historyTtl: KtxPromptCacheTtl;
} {
const cfg = this.provider.promptCachingConfig();
if (cfg.vertexFallbackTo5m && this.provider.activeBackend() === 'vertex' && isAnthropicProtocolModel(model)) {
return { systemTtl: '5m', toolsTtl: '5m', historyTtl: '5m' };
}
return { systemTtl: cfg.systemTtl, toolsTtl: cfg.toolsTtl, historyTtl: cfg.historyTtl };
}
private wrapLeading(currentMessage: ModelMessage, leadingUserContext?: string): ModelMessage {
if (!leadingUserContext) {
return currentMessage;
}
const reminderPart = {
type: 'text' as const,
text: `<system-reminder>\n${leadingUserContext}\n</system-reminder>`,
};
if (typeof currentMessage.content === 'string') {
return {
...currentMessage,
content: [reminderPart, { type: 'text' as const, text: currentMessage.content }],
} as ModelMessage;
}
if (Array.isArray(currentMessage.content)) {
return { ...currentMessage, content: [reminderPart, ...currentMessage.content] } as ModelMessage;
}
return currentMessage;
}
private markLastHistoryMessage(
history: ModelMessage[],
ttl: KtxPromptCacheTtl,
model: LanguageModel | string,
): ModelMessage[] {
if (history.length === 0) {
return history;
}
const out = [...history];
const last = out[out.length - 1];
const marker = this.provider.cacheMarker(ttl, model);
if (!marker) {
return history;
}
if (typeof last.content === 'string') {
out[out.length - 1] = {
...last,
content: [{ type: 'text', text: last.content, providerOptions: marker }],
} as ModelMessage;
return out;
}
if (Array.isArray(last.content) && last.content.length > 0) {
const parts = [...last.content];
const lastPart = parts[parts.length - 1];
parts[parts.length - 1] = Object.assign({}, lastPart, { providerOptions: marker });
out[out.length - 1] = { ...last, content: parts } as ModelMessage;
}
return out;
}
private sortAndMarkTools(
tools: ToolMap,
cachingActive: boolean,
cacheTools: boolean,
ttl: KtxPromptCacheTtl,
model: LanguageModel | string,
): ToolMap {
const keys = Object.keys(tools).sort();
const sorted: Record<string, unknown> = {};
for (const key of keys) {
sorted[key] = tools[key as keyof typeof tools];
}
if (cachingActive && cacheTools && keys.length > 0) {
const lastKey = keys[keys.length - 1];
const marker = this.provider.cacheMarker(ttl, model);
if (marker) {
sorted[lastKey] = { ...(sorted[lastKey] as Record<string, unknown>), providerOptions: marker };
}
}
return sorted as ToolMap;
}
}