mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-28 08:49:38 +02:00
refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract
Overlay sources now have two distinct collections: `columns:` for computed columns (requiring `expr` + `type`) and `column_overrides:` for metadata patches to inherited manifest columns. Composing or loading an overlay that mixes the two — or references an unknown column — fails with a typed error. Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` / `toResolvedWire` as the strict shape sent to the Python engine, and add a schema contract test that diffs Zod against the Pydantic JSON schema dumped by `python -m semantic_layer dump-schema`. `SourceDefinition` is now `extra="forbid"` on the Python side. `loadAllSources` surfaces per-file load errors instead of swallowing them, so validation/query paths can report manifest shard parse failures.
This commit is contained in:
parent
3e12a9fef4
commit
f561bfa850
42 changed files with 847 additions and 193 deletions
|
|
@ -4,8 +4,14 @@ import { noopLogger } from '../core/index.js';
|
|||
import type { TableUsageOutput } from '../ingest/adapters/historic-sql/skill-schemas.js';
|
||||
import type { SlConnectionCatalogPort, SlPythonPort } from './ports.js';
|
||||
import { normalizeSemanticLayerDescriptions } from './description-normalization.js';
|
||||
import { isOverlaySource, sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js';
|
||||
import type { SemanticLayerQueryExecutionResult, SemanticLayerQueryInput, SemanticLayerSource } from './types.js';
|
||||
import { isOverlaySource, resolvedSourceSchema, sourceDefinitionSchema, sourceOverlaySchema } from './schemas.js';
|
||||
import type {
|
||||
ResolvedSemanticLayerSource,
|
||||
SemanticLayerColumnOverride,
|
||||
SemanticLayerQueryExecutionResult,
|
||||
SemanticLayerQueryInput,
|
||||
SemanticLayerSource,
|
||||
} from './types.js';
|
||||
|
||||
interface WriteSourceOptions {
|
||||
skipValidation?: boolean;
|
||||
|
|
@ -14,6 +20,30 @@ interface WriteSourceOptions {
|
|||
const SL_DIR_PREFIX = 'semantic-layer';
|
||||
const CONNECTION_ID_PATTERN = /^[a-zA-Z0-9][a-zA-Z0-9_-]*$/;
|
||||
|
||||
export interface LoadAllSourcesResult {
|
||||
sources: SemanticLayerSource[];
|
||||
loadErrors: string[];
|
||||
}
|
||||
|
||||
export class UnknownColumnOverrideError extends Error {}
|
||||
export class ColumnNameCollisionError extends Error {}
|
||||
export class ConflictingExcludeAndOverrideError extends Error {}
|
||||
class ComposeContractError extends Error {}
|
||||
|
||||
function isComposeError(error: unknown): boolean {
|
||||
return (
|
||||
error instanceof UnknownColumnOverrideError ||
|
||||
error instanceof ColumnNameCollisionError ||
|
||||
error instanceof ConflictingExcludeAndOverrideError ||
|
||||
error instanceof ComposeContractError
|
||||
);
|
||||
}
|
||||
|
||||
function formatComposeError(filePath: string, error: unknown): string {
|
||||
const message = error instanceof Error ? error.message : String(error);
|
||||
return `${filePath}: ${message}`;
|
||||
}
|
||||
|
||||
function formatPortError(error: unknown, fallback: string): string {
|
||||
if (typeof error === 'string') {
|
||||
return error;
|
||||
|
|
@ -37,6 +67,24 @@ function formatPortError(error: unknown, fallback: string): string {
|
|||
return fallback;
|
||||
}
|
||||
|
||||
export function toResolvedWire(source: SemanticLayerSource): ResolvedSemanticLayerSource {
|
||||
const stripped = {
|
||||
...source,
|
||||
columns: source.columns.map((column) => ({ ...column })),
|
||||
joins: source.joins.map(({ source: _source, ...join }) => join),
|
||||
} as Record<string, unknown>;
|
||||
delete stripped.inherits_columns_from;
|
||||
delete stripped.usage;
|
||||
delete stripped.source_type;
|
||||
|
||||
const parsed = resolvedSourceSchema.safeParse(stripped);
|
||||
if (!parsed.success) {
|
||||
const issues = parsed.error.issues.map((issue) => `${issue.path.join('.')}: ${issue.message}`).join('; ');
|
||||
throw new ComposeContractError(`resolved source '${source.name}' violates the TS/Python contract: ${issues}`);
|
||||
}
|
||||
return parsed.data as ResolvedSemanticLayerSource;
|
||||
}
|
||||
|
||||
export class SemanticLayerService {
|
||||
constructor(
|
||||
private readonly configService: KtxFileStorePort,
|
||||
|
|
@ -158,16 +206,17 @@ export class SemanticLayerService {
|
|||
}
|
||||
}
|
||||
|
||||
async loadAllSources(connectionId: string): Promise<SemanticLayerSource[]> {
|
||||
async loadAllSources(connectionId: string): Promise<LoadAllSourcesResult> {
|
||||
const dir = `${SL_DIR_PREFIX}/${connectionId}`;
|
||||
const schemaDir = `${dir}/_schema`;
|
||||
const loadErrors: string[] = [];
|
||||
|
||||
let allFiles: string[];
|
||||
try {
|
||||
const result = await this.configService.listFiles(dir);
|
||||
allFiles = result.files.filter((f) => f.endsWith('.yaml'));
|
||||
} catch {
|
||||
return [];
|
||||
return { sources: [], loadErrors };
|
||||
}
|
||||
|
||||
// 1. Load manifest shards from _schema/*.yaml → project to sources
|
||||
|
|
@ -184,7 +233,9 @@ export class SemanticLayerService {
|
|||
}
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.warn(`Failed to parse manifest shard ${filePath}: ${e}`);
|
||||
const message = `Failed to parse manifest shard ${filePath}: ${e instanceof Error ? e.message : String(e)}`;
|
||||
loadErrors.push(message);
|
||||
this.logger.warn(message);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -227,6 +278,7 @@ export class SemanticLayerService {
|
|||
);
|
||||
}
|
||||
}
|
||||
toResolvedWire(standalone);
|
||||
sources.set(name, standalone);
|
||||
} else {
|
||||
// Overlay — compose with manifest entry if present
|
||||
|
|
@ -238,11 +290,15 @@ export class SemanticLayerService {
|
|||
}
|
||||
}
|
||||
} catch (e) {
|
||||
this.logger.warn(`Failed to parse YAML file ${filePath}: ${e}`);
|
||||
const message = isComposeError(e)
|
||||
? formatComposeError(filePath, e)
|
||||
: `Failed to parse YAML file ${filePath}: ${e instanceof Error ? e.message : String(e)}`;
|
||||
loadErrors.push(message);
|
||||
this.logger.warn(message);
|
||||
}
|
||||
}
|
||||
|
||||
return Array.from(sources.values());
|
||||
return { sources: Array.from(sources.values()), loadErrors };
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -622,8 +678,10 @@ export class SemanticLayerService {
|
|||
connectionId: string,
|
||||
proposedSource: SemanticLayerSource,
|
||||
): Promise<{ errors: string[]; warnings: string[]; perSourceWarnings: Record<string, string[]> }> {
|
||||
const existing = await this.loadAllSources(connectionId);
|
||||
const loaded = await this.loadAllSources(connectionId);
|
||||
const existing = loaded.sources;
|
||||
const merged = existing.filter((s) => s.name !== proposedSource.name);
|
||||
const loadErrors = [...loaded.loadErrors];
|
||||
|
||||
// Overlays (no table/sql) must be composed with their manifest base before
|
||||
// validation, otherwise the filter below drops them and the edited source
|
||||
|
|
@ -641,11 +699,27 @@ export class SemanticLayerService {
|
|||
perSourceWarnings: {},
|
||||
};
|
||||
}
|
||||
toPush = composeOverlay(base, { ...proposedSource });
|
||||
try {
|
||||
toPush = composeOverlay(base, { ...proposedSource });
|
||||
} catch (error) {
|
||||
return {
|
||||
errors: [...loadErrors, formatComposeError(`${proposedSource.name}.yaml`, error)],
|
||||
warnings: [],
|
||||
perSourceWarnings: {},
|
||||
};
|
||||
}
|
||||
} else if (proposedSource.inherits_columns_from) {
|
||||
const base = await this.findManifestEntryByTableRef(connectionId, proposedSource.inherits_columns_from);
|
||||
if (base) {
|
||||
toPush = enrichColumnsFromManifest(proposedSource, base);
|
||||
try {
|
||||
toPush = enrichColumnsFromManifest(proposedSource, base);
|
||||
} catch (error) {
|
||||
return {
|
||||
errors: [...loadErrors, formatComposeError(`${proposedSource.name}.yaml`, error)],
|
||||
warnings: [],
|
||||
perSourceWarnings: {},
|
||||
};
|
||||
}
|
||||
}
|
||||
// Miss is non-fatal — the source ships unenriched, validator will surface
|
||||
// any column-without-type errors via the warehouse probe.
|
||||
|
|
@ -654,37 +728,37 @@ export class SemanticLayerService {
|
|||
|
||||
const validatable = merged.filter((s) => s.table != null || s.sql != null);
|
||||
if (validatable.length === 0) {
|
||||
return { errors: [], warnings: [], perSourceWarnings: {} };
|
||||
return { errors: loadErrors, warnings: [], perSourceWarnings: {} };
|
||||
}
|
||||
|
||||
const dialect = await this.getDialectForConnection(connectionId);
|
||||
|
||||
try {
|
||||
const { data, error } = await this.python.validateSources({
|
||||
sources: validatable,
|
||||
sources: validatable.map(toResolvedWire),
|
||||
dialect,
|
||||
recently_touched: [proposedSource.name],
|
||||
});
|
||||
if (error) {
|
||||
const errorMsg = formatPortError(error, 'Unknown validation error');
|
||||
return { errors: [errorMsg], warnings: [], perSourceWarnings: {} };
|
||||
return { errors: [...loadErrors, errorMsg], warnings: [], perSourceWarnings: {} };
|
||||
}
|
||||
if (!data) {
|
||||
return {
|
||||
errors: await this.validatePhysicalTableReferences(connectionId, validatable),
|
||||
errors: [...loadErrors, ...(await this.validatePhysicalTableReferences(connectionId, validatable))],
|
||||
warnings: [],
|
||||
perSourceWarnings: {},
|
||||
};
|
||||
}
|
||||
const physicalErrors = await this.validatePhysicalTableReferences(connectionId, validatable);
|
||||
return {
|
||||
errors: [...(data.errors ?? []), ...physicalErrors],
|
||||
errors: [...loadErrors, ...(data.errors ?? []), ...physicalErrors],
|
||||
warnings: data.warnings ?? [],
|
||||
perSourceWarnings: data.per_source_warnings ?? {},
|
||||
};
|
||||
} catch (e) {
|
||||
return {
|
||||
errors: [`Validation call failed: ${e instanceof Error ? e.message : String(e)}`],
|
||||
errors: [...loadErrors, `Validation call failed: ${e instanceof Error ? e.message : String(e)}`],
|
||||
warnings: [],
|
||||
perSourceWarnings: {},
|
||||
};
|
||||
|
|
@ -692,23 +766,23 @@ export class SemanticLayerService {
|
|||
}
|
||||
|
||||
async validateSourcesForConnection(connectionId: string): Promise<{ errors: string[]; warnings: string[] }> {
|
||||
const allSources = await this.loadAllSources(connectionId);
|
||||
const { sources: allSources, loadErrors } = await this.loadAllSources(connectionId);
|
||||
const sources = allSources.filter((source) => source.table != null || source.sql != null);
|
||||
if (sources.length === 0) {
|
||||
return { errors: [], warnings: [] };
|
||||
return { errors: loadErrors, warnings: [] };
|
||||
}
|
||||
|
||||
const dialect = await this.getDialectForConnection(connectionId);
|
||||
const { data, error } = await this.python.validateSources({ sources, dialect });
|
||||
const { data, error } = await this.python.validateSources({ sources: sources.map(toResolvedWire), dialect });
|
||||
if (error) {
|
||||
return { errors: [formatPortError(error, 'Unknown validation error')], warnings: [] };
|
||||
return { errors: [...loadErrors, formatPortError(error, 'Unknown validation error')], warnings: [] };
|
||||
}
|
||||
if (!data) {
|
||||
return { errors: await this.validatePhysicalTableReferences(connectionId, sources), warnings: [] };
|
||||
return { errors: [...loadErrors, ...(await this.validatePhysicalTableReferences(connectionId, sources))], warnings: [] };
|
||||
}
|
||||
const physicalErrors = await this.validatePhysicalTableReferences(connectionId, sources);
|
||||
return {
|
||||
errors: [...(data.errors ?? []), ...physicalErrors],
|
||||
errors: [...loadErrors, ...(data.errors ?? []), ...physicalErrors],
|
||||
warnings: data.warnings ?? [],
|
||||
};
|
||||
}
|
||||
|
|
@ -802,6 +876,7 @@ export class SemanticLayerService {
|
|||
} else {
|
||||
// Overlay — check references against manifest
|
||||
const excludeColumns = (data.exclude_columns as string[]) ?? [];
|
||||
const columnOverrides = (data.column_overrides as Array<{ name: string }> | undefined) ?? [];
|
||||
const disableJoins = (data.disable_joins as string[]) ?? [];
|
||||
const cols = manifestColumns.get(name);
|
||||
const joins = manifestJoins.get(name);
|
||||
|
|
@ -817,6 +892,16 @@ export class SemanticLayerService {
|
|||
}
|
||||
}
|
||||
|
||||
const excluded = new Set(excludeColumns);
|
||||
for (const override of columnOverrides) {
|
||||
if (!cols.has(override.name)) {
|
||||
warnings.push(`${name}: column_overrides references non-existent column '${override.name}'`);
|
||||
}
|
||||
if (excluded.has(override.name)) {
|
||||
warnings.push(`${name}: column '${override.name}' appears in both exclude_columns and column_overrides`);
|
||||
}
|
||||
}
|
||||
|
||||
for (const joinOn of disableJoins) {
|
||||
const normalized = joinOn.replace(/\s+/g, ' ').trim();
|
||||
if (!joins?.has(normalized)) {
|
||||
|
|
@ -999,7 +1084,10 @@ export class SemanticLayerService {
|
|||
*/
|
||||
async executeQuery(connectionId: string, query: SemanticLayerQueryInput): Promise<SemanticLayerQueryExecutionResult> {
|
||||
// 1. Load sources, filtering out sources with no table or sql
|
||||
const allSources = await this.loadAllSources(connectionId);
|
||||
const { sources: allSources, loadErrors } = await this.loadAllSources(connectionId);
|
||||
if (loadErrors.length > 0) {
|
||||
throw new Error(`Semantic layer source load failed: ${loadErrors.join('; ')}`);
|
||||
}
|
||||
const sources = allSources.filter((s) => {
|
||||
if (!s.table && !s.sql) {
|
||||
this.logger.warn(`Skipping source "${s.name}" with no table or sql defined`);
|
||||
|
|
@ -1021,7 +1109,7 @@ export class SemanticLayerService {
|
|||
|
||||
// 3. Generate SQL via python SL engine
|
||||
const { data: slResult, error: slError } = await this.python.query({
|
||||
sources,
|
||||
sources: sources.map(toResolvedWire),
|
||||
query,
|
||||
dialect,
|
||||
});
|
||||
|
|
@ -1092,18 +1180,20 @@ export function projectManifestEntry(name: string, entry: ManifestTableEntry): S
|
|||
const grain = pkColumns.length > 0 ? pkColumns : entry.columns.map((c) => c.name);
|
||||
|
||||
// Table-level dbt config from manifest shards is surfaced on the source for search / tools.
|
||||
return {
|
||||
const source: SemanticLayerSource = {
|
||||
name,
|
||||
table: entry.table,
|
||||
descriptions: entry.descriptions,
|
||||
grain,
|
||||
columns,
|
||||
joins: (entry.joins ?? []).map((j) => ({ to: j.to, on: j.on, relationship: j.relationship, source: j.source })),
|
||||
joins: (entry.joins ?? []).map((j) => ({ to: j.to, on: j.on, relationship: j.relationship })),
|
||||
measures: [],
|
||||
...(entry.tags?.dbt?.length ? { tags: entry.tags } : {}),
|
||||
...(entry.freshness?.dbt ? { freshness: entry.freshness } : {}),
|
||||
...(entry.usage ? { usage: entry.usage } : {}),
|
||||
};
|
||||
toResolvedWire(source);
|
||||
return source;
|
||||
}
|
||||
|
||||
function normalizeWs(s: string): string {
|
||||
|
|
@ -1331,6 +1421,7 @@ const COMPOSE_KNOWN_KEYS = new Set([
|
|||
'descriptions',
|
||||
'grain',
|
||||
'columns',
|
||||
'column_overrides',
|
||||
'joins',
|
||||
'measures',
|
||||
'segments',
|
||||
|
|
@ -1365,27 +1456,48 @@ export function composeOverlay(base: SemanticLayerSource, overlay: Record<string
|
|||
result.usage = normalizedOverlay.usage as SemanticLayerSource['usage'];
|
||||
}
|
||||
|
||||
// Filter out excluded columns
|
||||
const excluded = new Set((normalizedOverlay.exclude_columns as string[] | undefined) ?? []);
|
||||
const baseColumns = result.columns.filter((c) => !excluded.has(c.name));
|
||||
|
||||
// Overlay columns matched by name merge onto the base column (overlay fields win, but
|
||||
// the base column's type/role/etc are preserved when the overlay omits them — dbt-style
|
||||
// overlays often declare a column only to attach descriptions). New names append.
|
||||
const overlayColumns = (normalizedOverlay.columns as SemanticLayerSource['columns'] | undefined) ?? [];
|
||||
const baseByName = new Map(baseColumns.map((c) => [c.name.toLowerCase(), c]));
|
||||
const mergedAppended: SemanticLayerSource['columns'] = [];
|
||||
const mergedByName = new Map<string, SemanticLayerSource['columns'][number]>();
|
||||
for (const overlay of overlayColumns) {
|
||||
const key = overlay.name.toLowerCase();
|
||||
const base = baseByName.get(key);
|
||||
if (base) {
|
||||
mergedByName.set(key, mergeOverlayColumn(base, overlay));
|
||||
} else {
|
||||
mergedAppended.push(overlay);
|
||||
}
|
||||
const columnOverrides = (normalizedOverlay.column_overrides as SemanticLayerColumnOverride[] | undefined) ?? [];
|
||||
const overrideNames = columnOverrides.map((column) => column.name);
|
||||
const conflictingOverrides = overrideNames.filter((name) => excluded.has(name));
|
||||
if (conflictingOverrides.length > 0) {
|
||||
throw new ConflictingExcludeAndOverrideError(
|
||||
`column_overrides conflict with exclude_columns for '${base.name}': ${conflictingOverrides.join(', ')}`,
|
||||
);
|
||||
}
|
||||
result.columns = [...baseColumns.map((c) => mergedByName.get(c.name.toLowerCase()) ?? c), ...mergedAppended];
|
||||
|
||||
const baseByLowerName = new Map(base.columns.map((column) => [column.name.toLowerCase(), column]));
|
||||
const columnsByLowerName = new Map(
|
||||
result.columns.filter((column) => !excluded.has(column.name)).map((column) => [column.name.toLowerCase(), column]),
|
||||
);
|
||||
|
||||
for (const override of columnOverrides) {
|
||||
const key = override.name.toLowerCase();
|
||||
const baseColumn = baseByLowerName.get(key);
|
||||
if (!baseColumn) {
|
||||
throw new UnknownColumnOverrideError(
|
||||
`column '${override.name}' in column_overrides does not exist on manifest source '${base.name}'`,
|
||||
);
|
||||
}
|
||||
const baseDescriptions = baseColumn.descriptions ?? {};
|
||||
const overrideDescriptions = override.descriptions ?? {};
|
||||
const merged = { ...baseColumn, ...override };
|
||||
if (Object.keys(baseDescriptions).length > 0 || Object.keys(overrideDescriptions).length > 0) {
|
||||
merged.descriptions = { ...baseDescriptions, ...overrideDescriptions };
|
||||
}
|
||||
columnsByLowerName.set(key, merged);
|
||||
}
|
||||
|
||||
const computedColumns = (normalizedOverlay.columns as SemanticLayerSource['columns'] | undefined) ?? [];
|
||||
for (const column of computedColumns) {
|
||||
if (baseByLowerName.has(column.name.toLowerCase())) {
|
||||
throw new ColumnNameCollisionError(
|
||||
`column '${column.name}' in columns patches a manifest column on '${base.name}' — move it to 'column_overrides:'`,
|
||||
);
|
||||
}
|
||||
columnsByLowerName.set(column.name.toLowerCase(), column);
|
||||
}
|
||||
result.columns = [...columnsByLowerName.values()];
|
||||
|
||||
// Measures from overlay only
|
||||
result.measures = (normalizedOverlay.measures as SemanticLayerSource['measures'] | undefined) ?? [];
|
||||
|
|
@ -1414,6 +1526,12 @@ export function composeOverlay(base: SemanticLayerSource, overlay: Record<string
|
|||
const newJoins = overlayJoins.filter((j) => !existingKeys.has(`${j.to}::${normalizeWs(j.on)}`));
|
||||
result.joins = [...manifestJoins, ...newJoins];
|
||||
|
||||
const overlayParse = sourceOverlaySchema.safeParse(normalizedOverlay);
|
||||
if (!overlayParse.success) {
|
||||
const issues = overlayParse.error.issues.map((issue) => `${issue.path.join('.')}: ${issue.message}`).join('; ');
|
||||
throw new ComposeContractError(`overlay for '${base.name}' violates the authoring schema: ${issues}`);
|
||||
}
|
||||
toResolvedWire(result);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
@ -1445,32 +1563,6 @@ function parseJoinOn(
|
|||
return { fromColumn: leftCol, toColumn: rightCol };
|
||||
}
|
||||
|
||||
/**
|
||||
* Merge an overlay column declaration onto a matching manifest column. Overlay fields
|
||||
* win, except descriptions (plural) which merge per source key. Manifest values are
|
||||
* preserved when the overlay omits them — this lets dbt/metabase emit description-only
|
||||
* overlay column entries without redeclaring `type:` (which would have to mirror the
|
||||
* scan column and rot when the schema changes).
|
||||
*/
|
||||
function mergeOverlayColumn(
|
||||
base: SemanticLayerSource['columns'][number],
|
||||
overlay: SemanticLayerSource['columns'][number],
|
||||
): SemanticLayerSource['columns'][number] {
|
||||
const merged: SemanticLayerSource['columns'][number] = { ...base, ...overlay };
|
||||
if (!overlay.type && base.type) {
|
||||
merged.type = base.type;
|
||||
}
|
||||
if (!overlay.role && base.role) {
|
||||
merged.role = base.role;
|
||||
}
|
||||
const baseDescriptions = base.descriptions ?? {};
|
||||
const overlayDescriptions = overlay.descriptions ?? {};
|
||||
if (Object.keys(baseDescriptions).length > 0 || Object.keys(overlayDescriptions).length > 0) {
|
||||
merged.descriptions = { ...baseDescriptions, ...overlayDescriptions };
|
||||
}
|
||||
return merged;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fill any blank `type`, `descriptions`, or `role` on the source's columns from the
|
||||
* matching manifest column (by name). Local values always win. Columns absent from
|
||||
|
|
@ -1503,5 +1595,7 @@ export function enrichColumnsFromManifest(
|
|||
}
|
||||
return merged;
|
||||
});
|
||||
return { ...source, columns: enrichedColumns };
|
||||
const enriched = { ...source, columns: enrichedColumns };
|
||||
toResolvedWire(enriched);
|
||||
return enriched;
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue