ktx/packages/llm/src/message-builder.ts

220 lines
7 KiB
TypeScript
Raw Permalink Normal View History

fix(context): merge overlay columns onto manifest columns by name (#94) * fix(context): merge overlay columns onto manifest columns by name composeOverlay was appending overlay columns to the manifest column list, producing duplicate entries when dbt/metabase overlays declared a column just to attach descriptions. The duplicates carried no `type`, so the pydantic SourceDefinition rejected them at semantic-query time and broke `ktx sl query` for every overlay-backed measure. Now overlay columns match base columns by name (case-insensitive): same-name entries merge onto the manifest (overlay fields win, type/role fall back to the base, descriptions merge per source key) and only new names append. * refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract Overlay sources now have two distinct collections: `columns:` for computed columns (requiring `expr` + `type`) and `column_overrides:` for metadata patches to inherited manifest columns. Composing or loading an overlay that mixes the two — or references an unknown column — fails with a typed error. Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` / `toResolvedWire` as the strict shape sent to the Python engine, and add a schema contract test that diffs Zod against the Pydantic JSON schema dumped by `python -m semantic_layer dump-schema`. `SourceDefinition` is now `extra="forbid"` on the Python side. `loadAllSources` surfaces per-file load errors instead of swallowing them, so validation/query paths can report manifest shard parse failures. * fix(context): make scan description generation resilient and quiet A transient sampleTable failure during ingest used to take out every table in a connection: generateTableDescription returned a hardcoded 'Table not found' string into descriptions.ai, and KtxDescriptionGenerator was constructed without a logger, so the failure left no trail anywhere. - sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff, honouring KtxScanContext.signal via a new KtxAbortedError. - On retry exhaustion or missing capability, table generation falls back to a metadata-only prompt built from column name / native type / comment / rawDescriptions. The column path follows the same rule -- call the LLM when any of samples or rawDescriptions are available; skip only when both are absent. - Logger is now threaded from KtxScanContext into the generator. Failures emit structured KtxScanWarning entries (new description_fallback_used code, plus existing sampling_failed / enrichment_failed / connector_capability_missing). ktx scan groups warnings by code so a batch of identical failures collapses to one summary line plus sample. - Returns null on failure instead of the 'Table not found' sentinel; the manifest writer's existing guard already skips empty descriptions, so schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS already strips stale 'ai' on merge, so existing YAML clears on next run. Also suppress AI SDK v6 'system in messages' warning: pull system messages out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages helper and pass them top-level to generateText (preserves cacheControl providerOptions on the SystemModelMessage). Agent-runner's local splitSystemPromptMessages dedupes onto the shared helper. * test(docs): align examples-docs assertions with revamped docs PR #103 (setup/guide doc revamp) reworded several CLI examples and connection labels; the assertions in scripts/examples-docs.test.mjs still referenced the pre-revamp wording and were failing in CI on main. Update the regexes to match the post-revamp content: - drop the `--json` flag from the sl-query example expectation - move the `Driver:` / `Status: ok` probe to the connection reference, which is where that output now lives (driver id is lowercase `postgres`, not the display name `PostgreSQL`) - drop the obsolete `Install \`uv\`...` troubleshooting line - accept `<connectionId>` everywhere; the docs no longer use the hyphenated `<connection-id>` form - match the `warehouse` connection id used in the quickstart instead of the `postgres-warehouse` id only used in the README and setup ref * fix(sl): skip TS/Python schema contract test when uv is unavailable The TypeScript checks CI job does not install uv or Python, so the module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw ENOENT and failed the suite. Wrap the schema dump in a try/catch and guard the describe block with `describe.skipIf` so the test skips in environments without uv. Local dev and any CI job that has uv on PATH still runs the cross-language contract assertion.
2026-05-15 02:11:04 +02:00
import type { LanguageModel, ModelMessage, SystemModelMessage, ToolSet } from 'ai';
2026-05-10 23:12:26 +02:00
import { isAnthropicProtocolModel } from './model-provider.js';
2026-05-10 23:51:24 +02:00
import type { KtxLlmProvider, KtxPromptCacheTtl, KtxPromptParts } from './types.js';
2026-05-10 23:12:26 +02:00
fix(context): merge overlay columns onto manifest columns by name (#94) * fix(context): merge overlay columns onto manifest columns by name composeOverlay was appending overlay columns to the manifest column list, producing duplicate entries when dbt/metabase overlays declared a column just to attach descriptions. The duplicates carried no `type`, so the pydantic SourceDefinition rejected them at semantic-query time and broke `ktx sl query` for every overlay-backed measure. Now overlay columns match base columns by name (case-insensitive): same-name entries merge onto the manifest (overlay fields win, type/role fall back to the base, descriptions merge per source key) and only new names append. * refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract Overlay sources now have two distinct collections: `columns:` for computed columns (requiring `expr` + `type`) and `column_overrides:` for metadata patches to inherited manifest columns. Composing or loading an overlay that mixes the two — or references an unknown column — fails with a typed error. Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` / `toResolvedWire` as the strict shape sent to the Python engine, and add a schema contract test that diffs Zod against the Pydantic JSON schema dumped by `python -m semantic_layer dump-schema`. `SourceDefinition` is now `extra="forbid"` on the Python side. `loadAllSources` surfaces per-file load errors instead of swallowing them, so validation/query paths can report manifest shard parse failures. * fix(context): make scan description generation resilient and quiet A transient sampleTable failure during ingest used to take out every table in a connection: generateTableDescription returned a hardcoded 'Table not found' string into descriptions.ai, and KtxDescriptionGenerator was constructed without a logger, so the failure left no trail anywhere. - sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff, honouring KtxScanContext.signal via a new KtxAbortedError. - On retry exhaustion or missing capability, table generation falls back to a metadata-only prompt built from column name / native type / comment / rawDescriptions. The column path follows the same rule -- call the LLM when any of samples or rawDescriptions are available; skip only when both are absent. - Logger is now threaded from KtxScanContext into the generator. Failures emit structured KtxScanWarning entries (new description_fallback_used code, plus existing sampling_failed / enrichment_failed / connector_capability_missing). ktx scan groups warnings by code so a batch of identical failures collapses to one summary line plus sample. - Returns null on failure instead of the 'Table not found' sentinel; the manifest writer's existing guard already skips empty descriptions, so schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS already strips stale 'ai' on merge, so existing YAML clears on next run. Also suppress AI SDK v6 'system in messages' warning: pull system messages out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages helper and pass them top-level to generateText (preserves cacheControl providerOptions on the SystemModelMessage). Agent-runner's local splitSystemPromptMessages dedupes onto the shared helper. * test(docs): align examples-docs assertions with revamped docs PR #103 (setup/guide doc revamp) reworded several CLI examples and connection labels; the assertions in scripts/examples-docs.test.mjs still referenced the pre-revamp wording and were failing in CI on main. Update the regexes to match the post-revamp content: - drop the `--json` flag from the sl-query example expectation - move the `Driver:` / `Status: ok` probe to the connection reference, which is where that output now lives (driver id is lowercase `postgres`, not the display name `PostgreSQL`) - drop the obsolete `Install \`uv\`...` troubleshooting line - accept `<connectionId>` everywhere; the docs no longer use the hyphenated `<connection-id>` form - match the `warehouse` connection id used in the quickstart instead of the `postgres-warehouse` id only used in the README and setup ref * fix(sl): skip TS/Python schema contract test when uv is unavailable The TypeScript checks CI job does not install uv or Python, so the module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw ENOENT and failed the suite. Wrap the schema dump in a try/catch and guard the describe block with `describe.skipIf` so the test skips in environments without uv. Local dev and any CI job that has uv on PATH still runs the cross-language contract assertion.
2026-05-15 02:11:04 +02:00
export interface KtxSplitSystemMessagesResult {
system: SystemModelMessage | SystemModelMessage[] | undefined;
messages: ModelMessage[];
}
export function splitKtxSystemMessages(messages: readonly ModelMessage[]): KtxSplitSystemMessagesResult {
const systemMessages: SystemModelMessage[] = [];
const otherMessages: ModelMessage[] = [];
for (const message of messages) {
if (message.role === 'system') {
systemMessages.push(message);
} else {
otherMessages.push(message);
}
}
return {
system:
systemMessages.length === 0 ? undefined : systemMessages.length === 1 ? systemMessages[0] : systemMessages,
messages: otherMessages,
};
}
2026-05-10 23:12:26 +02:00
type ToolMap = ToolSet | Record<string, Record<string, unknown>>;
2026-05-10 23:51:24 +02:00
interface KtxMessageBuilderOptions {
2026-05-10 23:12:26 +02:00
cacheSystem?: boolean;
cacheTools?: boolean;
cacheLastHistory?: boolean;
}
2026-05-10 23:51:24 +02:00
interface KtxBuildInput {
parts: KtxPromptParts;
2026-05-10 23:12:26 +02:00
history: ModelMessage[];
currentMessage: ModelMessage;
tools: ToolMap;
model: LanguageModel | string;
}
2026-05-10 23:51:24 +02:00
interface KtxWrapSimpleInput {
2026-05-10 23:12:26 +02:00
system?: string;
messages?: ModelMessage[];
tools?: ToolMap;
model: LanguageModel | string;
}
2026-05-10 23:51:24 +02:00
interface KtxBuildOutput {
2026-05-10 23:12:26 +02:00
messages: ModelMessage[];
tools: ToolMap;
}
2026-05-10 23:51:24 +02:00
export class KtxMessageBuilder {
2026-05-10 23:12:26 +02:00
constructor(
2026-05-10 23:51:24 +02:00
private readonly provider: KtxLlmProvider,
private readonly options: KtxMessageBuilderOptions = {},
2026-05-10 23:12:26 +02:00
) {}
2026-05-10 23:51:24 +02:00
build(input: KtxBuildInput): KtxBuildOutput {
2026-05-10 23:12:26 +02:00
const cfg = this.provider.promptCachingConfig();
const cachingActive = cfg.enabled && isAnthropicProtocolModel(input.model);
const ttls = this.resolveTtls(input.model);
const messages: ModelMessage[] = [];
const systemMessage: ModelMessage & { providerOptions?: unknown } = {
role: 'system',
content: input.parts.staticSystem,
};
if (cachingActive && this.cacheSystemEnabled()) {
systemMessage.providerOptions = this.provider.cacheMarker(ttls.systemTtl, input.model);
}
messages.push(systemMessage);
if (input.parts.dynamicSystem) {
messages.push({ role: 'system', content: input.parts.dynamicSystem });
}
const historyToEmit =
cachingActive && this.cacheHistoryEnabled()
? this.markLastHistoryMessage(input.history, ttls.historyTtl, input.model)
: input.history;
messages.push(...historyToEmit);
messages.push(this.wrapLeading(input.currentMessage, input.parts.leadingUserContext));
return {
messages,
tools: this.sortAndMarkTools(input.tools, cachingActive, this.cacheToolsEnabled(), ttls.toolsTtl, input.model),
};
}
2026-05-10 23:51:24 +02:00
wrapSimple(input: KtxWrapSimpleInput): KtxBuildOutput {
2026-05-10 23:12:26 +02:00
const cfg = this.provider.promptCachingConfig();
const cachingActive = cfg.enabled && isAnthropicProtocolModel(input.model);
const ttls = this.resolveTtls(input.model);
const messages: ModelMessage[] = [];
if (input.system) {
const systemMessage: ModelMessage & { providerOptions?: unknown } = {
role: 'system',
content: input.system,
};
if (cachingActive && this.cacheSystemEnabled()) {
systemMessage.providerOptions = this.provider.cacheMarker(ttls.systemTtl, input.model);
}
messages.push(systemMessage);
}
if (input.messages) {
messages.push(
...(cachingActive && this.cacheHistoryEnabled()
? this.markLastHistoryMessage(input.messages, ttls.historyTtl, input.model)
: input.messages),
);
}
return {
messages,
tools: this.sortAndMarkTools(input.tools ?? {}, cachingActive, this.cacheToolsEnabled(), ttls.toolsTtl, input.model),
};
}
private cacheSystemEnabled(): boolean {
return this.options.cacheSystem ?? this.provider.promptCachingConfig().cacheSystem;
}
private cacheToolsEnabled(): boolean {
return this.options.cacheTools ?? this.provider.promptCachingConfig().cacheTools;
}
private cacheHistoryEnabled(): boolean {
return this.options.cacheLastHistory ?? this.provider.promptCachingConfig().cacheHistory;
}
private resolveTtls(model: LanguageModel | string): {
2026-05-10 23:51:24 +02:00
systemTtl: KtxPromptCacheTtl;
toolsTtl: KtxPromptCacheTtl;
historyTtl: KtxPromptCacheTtl;
2026-05-10 23:12:26 +02:00
} {
const cfg = this.provider.promptCachingConfig();
if (cfg.vertexFallbackTo5m && this.provider.activeBackend() === 'vertex' && isAnthropicProtocolModel(model)) {
return { systemTtl: '5m', toolsTtl: '5m', historyTtl: '5m' };
}
return { systemTtl: cfg.systemTtl, toolsTtl: cfg.toolsTtl, historyTtl: cfg.historyTtl };
}
private wrapLeading(currentMessage: ModelMessage, leadingUserContext?: string): ModelMessage {
if (!leadingUserContext) {
return currentMessage;
}
const reminderPart = {
type: 'text' as const,
text: `<system-reminder>\n${leadingUserContext}\n</system-reminder>`,
};
if (typeof currentMessage.content === 'string') {
return {
...currentMessage,
content: [reminderPart, { type: 'text' as const, text: currentMessage.content }],
} as ModelMessage;
}
if (Array.isArray(currentMessage.content)) {
return { ...currentMessage, content: [reminderPart, ...currentMessage.content] } as ModelMessage;
}
return currentMessage;
}
private markLastHistoryMessage(
history: ModelMessage[],
2026-05-10 23:51:24 +02:00
ttl: KtxPromptCacheTtl,
2026-05-10 23:12:26 +02:00
model: LanguageModel | string,
): ModelMessage[] {
if (history.length === 0) {
return history;
}
const out = [...history];
const last = out[out.length - 1];
const marker = this.provider.cacheMarker(ttl, model);
if (!marker) {
return history;
}
if (typeof last.content === 'string') {
out[out.length - 1] = {
...last,
content: [{ type: 'text', text: last.content, providerOptions: marker }],
} as ModelMessage;
return out;
}
if (Array.isArray(last.content) && last.content.length > 0) {
const parts = [...last.content];
const lastPart = parts[parts.length - 1];
parts[parts.length - 1] = Object.assign({}, lastPart, { providerOptions: marker });
out[out.length - 1] = { ...last, content: parts } as ModelMessage;
}
return out;
}
private sortAndMarkTools(
tools: ToolMap,
cachingActive: boolean,
cacheTools: boolean,
2026-05-10 23:51:24 +02:00
ttl: KtxPromptCacheTtl,
2026-05-10 23:12:26 +02:00
model: LanguageModel | string,
): ToolMap {
const keys = Object.keys(tools).sort();
const sorted: Record<string, unknown> = {};
for (const key of keys) {
sorted[key] = tools[key as keyof typeof tools];
}
if (cachingActive && cacheTools && keys.length > 0) {
const lastKey = keys[keys.length - 1];
const marker = this.provider.cacheMarker(ttl, model);
if (marker) {
sorted[lastKey] = { ...(sorted[lastKey] as Record<string, unknown>), providerOptions: marker };
}
}
return sorted as ToolMap;
}
}