fix(context): merge overlay columns onto manifest columns by name (#94)

* fix(context): merge overlay columns onto manifest columns by name

composeOverlay was appending overlay columns to the manifest column list,
producing duplicate entries when dbt/metabase overlays declared a column
just to attach descriptions. The duplicates carried no `type`, so the
pydantic SourceDefinition rejected them at semantic-query time and broke
`ktx sl query` for every overlay-backed measure. Now overlay columns
match base columns by name (case-insensitive): same-name entries merge
onto the manifest (overlay fields win, type/role fall back to the base,
descriptions merge per source key) and only new names append.

* refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract

Overlay sources now have two distinct collections: `columns:` for computed
columns (requiring `expr` + `type`) and `column_overrides:` for metadata
patches to inherited manifest columns. Composing or loading an overlay that
mixes the two — or references an unknown column — fails with a typed error.

Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` /
`toResolvedWire` as the strict shape sent to the Python engine, and add a
schema contract test that diffs Zod against the Pydantic JSON schema dumped
by `python -m semantic_layer dump-schema`. `SourceDefinition` is now
`extra="forbid"` on the Python side.

`loadAllSources` surfaces per-file load errors instead of swallowing them,
so validation/query paths can report manifest shard parse failures.

* fix(context): make scan description generation resilient and quiet

A transient sampleTable failure during ingest used to take out every
table in a connection: generateTableDescription returned a hardcoded
'Table not found' string into descriptions.ai, and KtxDescriptionGenerator
was constructed without a logger, so the failure left no trail anywhere.

- sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff,
  honouring KtxScanContext.signal via a new KtxAbortedError.
- On retry exhaustion or missing capability, table generation falls back
  to a metadata-only prompt built from column name / native type / comment
  / rawDescriptions. The column path follows the same rule -- call the
  LLM when any of samples or rawDescriptions are available; skip only
  when both are absent.
- Logger is now threaded from KtxScanContext into the generator. Failures
  emit structured KtxScanWarning entries (new description_fallback_used
  code, plus existing sampling_failed / enrichment_failed /
  connector_capability_missing). ktx scan groups warnings by code so a
  batch of identical failures collapses to one summary line plus sample.
- Returns null on failure instead of the 'Table not found' sentinel; the
  manifest writer's existing guard already skips empty descriptions, so
  schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS
  already strips stale 'ai' on merge, so existing YAML clears on next run.

Also suppress AI SDK v6 'system in messages' warning: pull system messages
out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages
helper and pass them top-level to generateText (preserves cacheControl
providerOptions on the SystemModelMessage). Agent-runner's local
splitSystemPromptMessages dedupes onto the shared helper.

* test(docs): align examples-docs assertions with revamped docs

PR #103 (setup/guide doc revamp) reworded several CLI examples and
connection labels; the assertions in scripts/examples-docs.test.mjs
still referenced the pre-revamp wording and were failing in CI on main.
Update the regexes to match the post-revamp content:

- drop the `--json` flag from the sl-query example expectation
- move the `Driver:` / `Status: ok` probe to the connection reference,
  which is where that output now lives (driver id is lowercase
  `postgres`, not the display name `PostgreSQL`)
- drop the obsolete `Install \`uv\`...` troubleshooting line
- accept `<connectionId>` everywhere; the docs no longer use the
  hyphenated `<connection-id>` form
- match the `warehouse` connection id used in the quickstart instead of
  the `postgres-warehouse` id only used in the README and setup ref

* fix(sl): skip TS/Python schema contract test when uv is unavailable

The TypeScript checks CI job does not install uv or Python, so the
module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw
ENOENT and failed the suite. Wrap the schema dump in a try/catch and
guard the describe block with `describe.skipIf` so the test skips in
environments without uv. Local dev and any CI job that has uv on PATH
still runs the cross-language contract assertion.
This commit is contained in:
Andrey Avtomonov 2026-05-15 02:11:04 +02:00 committed by GitHub
parent 6bc8d200ea
commit cb8902f1e5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
56 changed files with 1650 additions and 237 deletions

View file

@ -38,7 +38,7 @@ describe('importMetricflowSemanticModels', () => {
const scoped = {
getManifestEntry: vi.fn().mockResolvedValue(null),
isManifestBacked: vi.fn().mockResolvedValue(false),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi.fn().mockResolvedValue({ warnings: [] }),
};
@ -104,7 +104,7 @@ describe('importMetricflowSemanticModels', () => {
const scoped = {
getManifestEntry: vi.fn().mockResolvedValue(null),
isManifestBacked: vi.fn().mockResolvedValue(false),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockImplementation((connectionId: string, sourceName: string) =>
Promise.resolve(sourceName === 'orders' ? { name: 'orders' } : null),
),
@ -139,7 +139,7 @@ describe('importMetricflowSemanticModels', () => {
const scoped = {
getManifestEntry: vi.fn().mockResolvedValue(null),
isManifestBacked: vi.fn().mockResolvedValue(false),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi.fn().mockRejectedValueOnce(new Error('cannot write orders')).mockResolvedValue({ warnings: [] }),
};
@ -190,7 +190,7 @@ describe('importMetricflowSemanticModels', () => {
isManifestBacked: vi.fn().mockImplementation(async (_connectionId: string, sourceName: string) => {
return sourceName === 'orders';
}),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi.fn().mockImplementation(async (_connectionId: string, source: (typeof written)[number]) => {
written.push(source);
@ -268,7 +268,7 @@ describe('importMetricflowSemanticModels', () => {
isManifestBacked: vi.fn().mockImplementation(async (_connectionId: string, sourceName: string) => {
return sourceName === 'orders';
}),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi.fn().mockResolvedValue({ warnings: [] }),
};
@ -311,7 +311,7 @@ describe('importMetricflowSemanticModels', () => {
const scoped = {
getManifestEntry: vi.fn().mockResolvedValue(null),
isManifestBacked: vi.fn().mockResolvedValue(false),
loadAllSources: vi.fn().mockResolvedValue([]),
loadAllSources: vi.fn().mockResolvedValue({ sources: [], loadErrors: [] }),
loadSource: vi.fn().mockResolvedValue(null),
writeSource: vi
.fn()

View file

@ -71,7 +71,7 @@ export async function importMetricflowSemanticModels(
let crossModelSourcesCreated = 0;
const preexistingSourceNames = new Set(
(await semanticLayerService.loadAllSources(input.connectionId)).map((source) => source.name),
(await semanticLayerService.loadAllSources(input.connectionId)).sources.map((source) => source.name),
);
const modelContexts: MetricflowSemanticModelImportContext[] = [];
const sourceNameByModelRef = new Map<string, string>();

View file

@ -187,7 +187,10 @@ const makeDeps = () => {
loadAllSources: vi
.fn()
.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : []),
Promise.resolve({
sources: connectionId === 'warehouse-2' ? [{ name: 'looker__orders' }] : [],
loadErrors: [],
}),
),
};
const slSearchService = {
@ -1347,7 +1350,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
frontmatter: { sl_refs: ['looker__b2b__sales_pipeline.arr'] },
});
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve([{ name: `${connectionId}_source` }]),
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
@ -1447,7 +1450,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
parseArtifacts: { semanticModels: [{ name: 'orders' }] },
});
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve([{ name: `${connectionId}_source` }]),
Promise.resolve({ sources: [{ name: `${connectionId}_source` }], loadErrors: [] }),
);
const postProcessor = {
run: vi.fn().mockResolvedValue({
@ -1631,7 +1634,10 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
const deps = makeDeps();
deps.adapter.listTargetConnectionIds = vi.fn().mockResolvedValue(['postgres-warehouse']);
deps.semanticLayerService.loadAllSources.mockImplementation((connectionId: string) =>
Promise.resolve(connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : []),
Promise.resolve({
sources: connectionId === 'postgres-warehouse' ? [{ name: 'stg_accounts' }] : [],
loadErrors: [],
}),
);
const runner = buildRunner(deps);
@ -1659,7 +1665,10 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
it('does not resolve qualified fallback table refs by source name alone', async () => {
const deps = makeDeps();
deps.semanticLayerService.loadAllSources.mockResolvedValue([{ name: 'orders', table: 'sales.orders' }]);
deps.semanticLayerService.loadAllSources.mockResolvedValue({
sources: [{ name: 'orders', table: 'sales.orders' }],
loadErrors: [],
});
const runner = buildRunner(deps);
await expect(

View file

@ -300,7 +300,7 @@ export class IngestBundleRunner {
const blocks = await Promise.all(
connectionIds.map(async (connectionId) => {
try {
const sources = await this.deps.semanticLayerService.loadAllSources(connectionId);
const { sources } = await this.deps.semanticLayerService.loadAllSources(connectionId);
const names = sources.map((source) => source.name).sort((left, right) => left.localeCompare(right));
const body = names.length > 0 ? names.join('\n') : '(no sources yet)';
return `## ${connectionId}\n${body}`;
@ -329,7 +329,7 @@ export class IngestBundleRunner {
): Promise<boolean> {
for (const connectionId of connectionIds) {
try {
const sources = await semanticLayerService.loadAllSources(connectionId);
const { sources } = await semanticLayerService.loadAllSources(connectionId);
if (sources.some((source) => semanticSourceMatchesTableRef(source, tableRef))) {
return true;
}
@ -1211,7 +1211,7 @@ export class IngestBundleRunner {
].sort();
for (const connectionId of touchedConnections) {
try {
const allSources = await this.deps.semanticLayerService.loadAllSources(connectionId);
const { sources: allSources } = await this.deps.semanticLayerService.loadAllSources(connectionId);
await this.deps.slSearchService.indexSources(connectionId, allSources);
} catch (err) {
this.logger.warn(

View file

@ -227,9 +227,10 @@ describe('PageTriageService', () => {
});
generateTextMock
.mockImplementationOnce((args: any) => {
const systemMessage = args.messages.find((m: { role: string }) => m.role === 'system');
const systemMessage = args.system ?? args.messages.find((m: { role: string }) => m.role === 'system');
const userMessage = args.messages.find((m: { role: string }) => m.role === 'user');
const systemText = systemMessage.content as string;
const systemText =
typeof systemMessage === 'string' ? systemMessage : (systemMessage.content as string);
const userText = userMessage.content as string;
expect(systemText).toContain(
'Reusable templates and scripts are durable knowledge regardless of subject matter.',

View file

@ -1,7 +1,7 @@
import { createHash } from 'node:crypto';
import { readdir, readFile } from 'node:fs/promises';
import { dirname, join, relative } from 'node:path';
import { KtxMessageBuilder, type KtxLlmProvider } from '@ktx/llm';
import { KtxMessageBuilder, splitKtxSystemMessages, type KtxLlmProvider } from '@ktx/llm';
import { generateText, type ToolSet } from 'ai';
import pLimit from 'p-limit';
import { z } from 'zod';
@ -346,10 +346,12 @@ export class PageTriageService {
tools: {},
model,
});
const split = splitKtxSystemMessages(built.messages);
const result = await this.runGenerateText({
model,
temperature: 0,
messages: built.messages,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
});
return result.text;

View file

@ -44,23 +44,26 @@ describe('repairWikiSlRefs', () => {
})),
};
const semanticLayerService = {
loadAllSources: vi.fn(async () => [
{
name: 'mart_customer_health',
grain: [],
columns: [],
joins: [],
measures: [{ name: 'high_risk_account_count', expr: 'count(*)' }],
segments: [{ name: 'high_risk', expr: "risk_level = 'high'" }],
},
{
name: 'int_procurement_qualifying_actions',
grain: [],
columns: [],
joins: [],
measures: [],
},
]),
loadAllSources: vi.fn(async () => ({
sources: [
{
name: 'mart_customer_health',
grain: [],
columns: [],
joins: [],
measures: [{ name: 'high_risk_account_count', expr: 'count(*)' }],
segments: [{ name: 'high_risk', expr: "risk_level = 'high'" }],
},
{
name: 'int_procurement_qualifying_actions',
grain: [],
columns: [],
joins: [],
measures: [],
},
],
loadErrors: [],
})),
};
const result = await repairWikiSlRefs({

View file

@ -56,7 +56,8 @@ async function loadVisibleSlRefs(
const warnings: string[] = [];
for (const connectionId of connectionIds) {
try {
for (const source of await semanticLayerService.loadAllSources(connectionId)) {
const { sources } = await semanticLayerService.loadAllSources(connectionId);
for (const source of sources) {
for (const ref of entityRefsForSource(source)) {
refs.add(ref);
}