mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
* fix(context): merge overlay columns onto manifest columns by name composeOverlay was appending overlay columns to the manifest column list, producing duplicate entries when dbt/metabase overlays declared a column just to attach descriptions. The duplicates carried no `type`, so the pydantic SourceDefinition rejected them at semantic-query time and broke `ktx sl query` for every overlay-backed measure. Now overlay columns match base columns by name (case-insensitive): same-name entries merge onto the manifest (overlay fields win, type/role fall back to the base, descriptions merge per source key) and only new names append. * refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract Overlay sources now have two distinct collections: `columns:` for computed columns (requiring `expr` + `type`) and `column_overrides:` for metadata patches to inherited manifest columns. Composing or loading an overlay that mixes the two — or references an unknown column — fails with a typed error. Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` / `toResolvedWire` as the strict shape sent to the Python engine, and add a schema contract test that diffs Zod against the Pydantic JSON schema dumped by `python -m semantic_layer dump-schema`. `SourceDefinition` is now `extra="forbid"` on the Python side. `loadAllSources` surfaces per-file load errors instead of swallowing them, so validation/query paths can report manifest shard parse failures. * fix(context): make scan description generation resilient and quiet A transient sampleTable failure during ingest used to take out every table in a connection: generateTableDescription returned a hardcoded 'Table not found' string into descriptions.ai, and KtxDescriptionGenerator was constructed without a logger, so the failure left no trail anywhere. - sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff, honouring KtxScanContext.signal via a new KtxAbortedError. - On retry exhaustion or missing capability, table generation falls back to a metadata-only prompt built from column name / native type / comment / rawDescriptions. The column path follows the same rule -- call the LLM when any of samples or rawDescriptions are available; skip only when both are absent. - Logger is now threaded from KtxScanContext into the generator. Failures emit structured KtxScanWarning entries (new description_fallback_used code, plus existing sampling_failed / enrichment_failed / connector_capability_missing). ktx scan groups warnings by code so a batch of identical failures collapses to one summary line plus sample. - Returns null on failure instead of the 'Table not found' sentinel; the manifest writer's existing guard already skips empty descriptions, so schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS already strips stale 'ai' on merge, so existing YAML clears on next run. Also suppress AI SDK v6 'system in messages' warning: pull system messages out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages helper and pass them top-level to generateText (preserves cacheControl providerOptions on the SystemModelMessage). Agent-runner's local splitSystemPromptMessages dedupes onto the shared helper. * test(docs): align examples-docs assertions with revamped docs PR #103 (setup/guide doc revamp) reworded several CLI examples and connection labels; the assertions in scripts/examples-docs.test.mjs still referenced the pre-revamp wording and were failing in CI on main. Update the regexes to match the post-revamp content: - drop the `--json` flag from the sl-query example expectation - move the `Driver:` / `Status: ok` probe to the connection reference, which is where that output now lives (driver id is lowercase `postgres`, not the display name `PostgreSQL`) - drop the obsolete `Install \`uv\`...` troubleshooting line - accept `<connectionId>` everywhere; the docs no longer use the hyphenated `<connection-id>` form - match the `warehouse` connection id used in the quickstart instead of the `postgres-warehouse` id only used in the README and setup ref * fix(sl): skip TS/Python schema contract test when uv is unavailable The TypeScript checks CI job does not install uv or Python, so the module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw ENOENT and failed the suite. Wrap the schema dump in a try/catch and guard the describe block with `describe.skipIf` so the test skips in environments without uv. Local dev and any CI job that has uv on PATH still runs the cross-language contract assertion.
227 lines
7.4 KiB
Python
227 lines
7.4 KiB
Python
"""Manifest models and projection for the two-tier schema architecture.
|
|
|
|
The manifest (`_schema/*.yaml`) stores physical table catalog data with DB-native
|
|
types, PK flags, and join provenance. This module handles:
|
|
- Manifest-specific data models (ManifestColumn, ManifestJoin, ManifestEntry)
|
|
- DB-native → semantic type mapping
|
|
- Projection from ManifestEntry → SourceDefinition
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Literal
|
|
|
|
from pydantic import BaseModel
|
|
|
|
from semantic_layer.models import (
|
|
ColumnRole,
|
|
DefaultTimeDimensionDbt,
|
|
FreshnessDbt,
|
|
JoinDeclaration,
|
|
SourceColumn,
|
|
SourceColumnTests,
|
|
SourceDefinition,
|
|
)
|
|
|
|
# ── Type mapping (DB-native → semantic) ─────────────────────────────
|
|
|
|
_TYPE_MAP: dict[str, str] = {
|
|
# number family
|
|
"integer": "number",
|
|
"bigint": "number",
|
|
"smallint": "number",
|
|
"numeric": "number",
|
|
"decimal": "number",
|
|
"float": "number",
|
|
"double": "number",
|
|
"real": "number",
|
|
"int": "number",
|
|
"int2": "number",
|
|
"int4": "number",
|
|
"int8": "number",
|
|
"float4": "number",
|
|
"float8": "number",
|
|
"double precision": "number",
|
|
"number": "number",
|
|
"tinyint": "number",
|
|
"mediumint": "number",
|
|
# time family
|
|
"timestamp": "time",
|
|
"timestamptz": "time",
|
|
"timestamp with time zone": "time",
|
|
"timestamp without time zone": "time",
|
|
"timestamp_ntz": "time",
|
|
"timestamp_ltz": "time",
|
|
"timestamp_tz": "time",
|
|
"datetime": "time",
|
|
"date": "time",
|
|
"time": "time",
|
|
"timetz": "time",
|
|
# boolean family
|
|
"boolean": "boolean",
|
|
"bool": "boolean",
|
|
# fallback → 'string'
|
|
}
|
|
|
|
|
|
def map_column_type(db_type: str) -> str:
|
|
"""Map a DB-native column type to a semantic type (string/number/time/boolean)."""
|
|
normalized = db_type.lower().split("(")[0].strip()
|
|
return _TYPE_MAP.get(normalized, "string")
|
|
|
|
|
|
# ── Manifest data models ────────────────────────────────────────────
|
|
|
|
|
|
_DEFAULT_PRIORITY = ["user", "ai", "dbt", "db"]
|
|
|
|
|
|
def _description_sources(descriptions: dict[str, str] | None) -> dict[str, str] | None:
|
|
"""Normalize multi-source descriptions to a keyed map."""
|
|
if descriptions:
|
|
result = {source: text for source, text in descriptions.items() if text}
|
|
if result:
|
|
return result
|
|
return None
|
|
|
|
|
|
def _resolve_description(descriptions: dict[str, str] | None) -> str | None:
|
|
"""Resolve a single description from a multi-source map."""
|
|
if descriptions:
|
|
for source in _DEFAULT_PRIORITY:
|
|
if text := descriptions.get(source):
|
|
return text
|
|
# Fallback: first available
|
|
for text in descriptions.values():
|
|
if text:
|
|
return text
|
|
return None
|
|
|
|
|
|
class ManifestColumn(BaseModel):
|
|
name: str
|
|
type: str # DB-native type (e.g., "integer", "varchar", "timestamp")
|
|
pk: bool = False
|
|
nullable: bool = True
|
|
descriptions: dict[str, str] | None = None
|
|
constraints: dict | None = None
|
|
enum_values: dict[str, list[str]] | None = None
|
|
tests: SourceColumnTests | None = None
|
|
|
|
@property
|
|
def resolved_description(self) -> str | None:
|
|
return _resolve_description(self.descriptions)
|
|
|
|
|
|
class ManifestJoin(BaseModel):
|
|
to: str
|
|
on: str
|
|
relationship: Literal["many_to_one", "one_to_many", "one_to_one"]
|
|
source: Literal["formal", "inferred", "manual"] = "formal"
|
|
|
|
|
|
class ManifestEntry(BaseModel):
|
|
table: str
|
|
descriptions: dict[str, str] | None = None
|
|
columns: list[ManifestColumn]
|
|
joins: list[ManifestJoin] = []
|
|
default_time_dimension: DefaultTimeDimensionDbt | None = None
|
|
tags: dict[str, list[str]] | None = None
|
|
freshness: dict[str, FreshnessDbt] | None = None
|
|
|
|
@property
|
|
def resolved_description(self) -> str | None:
|
|
return _resolve_description(self.descriptions)
|
|
|
|
|
|
class Manifest(BaseModel):
|
|
"""A single manifest shard file (`_schema/{schema}.yaml`)."""
|
|
|
|
tables: dict[str, ManifestEntry]
|
|
|
|
|
|
# ── Projection ──────────────────────────────────────────────────────
|
|
|
|
|
|
def validate_overlay(
|
|
data: dict, manifest_column_names: set[str] | None = None
|
|
) -> list[str]:
|
|
"""Validate that overlay data doesn't contain structural fields.
|
|
|
|
Returns a list of error messages (empty if valid).
|
|
"""
|
|
errors: list[str] = []
|
|
if "description" in data:
|
|
errors.append("Overlay must use 'descriptions' for source descriptions")
|
|
if "table" in data:
|
|
errors.append("Overlay must not contain 'table' (owned by manifest)")
|
|
if "sql" in data:
|
|
errors.append(
|
|
"Overlay must not contain 'sql' (that makes it a standalone source)"
|
|
)
|
|
for col in data.get("columns", []):
|
|
if "description" in col:
|
|
errors.append(
|
|
f"Overlay column '{col.get('name', '?')}' must use 'descriptions'"
|
|
)
|
|
if "expr" not in col:
|
|
errors.append(
|
|
f"Overlay column '{col.get('name', '?')}' in 'columns' must define "
|
|
f"'expr' and 'type' (use 'column_overrides' to patch manifest columns)"
|
|
)
|
|
if "type" not in col:
|
|
errors.append(
|
|
f"Overlay column '{col.get('name', '?')}' in 'columns' must define "
|
|
f"'type' and 'expr' (use 'column_overrides' to patch manifest columns)"
|
|
)
|
|
for col in data.get("column_overrides", []):
|
|
name = col.get("name", "?")
|
|
if "description" in col:
|
|
errors.append(f"Column override '{name}' must use 'descriptions'")
|
|
if "type" in col:
|
|
errors.append(f"Column override '{name}' must not contain 'type'")
|
|
if "expr" in col:
|
|
errors.append(f"Column override '{name}' must not contain 'expr'")
|
|
if manifest_column_names is not None and name not in manifest_column_names:
|
|
errors.append(f"Column override '{name}' does not match a manifest column")
|
|
return errors
|
|
|
|
|
|
def project_manifest_entry(name: str, entry: ManifestEntry) -> SourceDefinition:
|
|
"""Convert a raw manifest entry into a valid SourceDefinition.
|
|
|
|
- Maps DB-native column types to semantic types
|
|
- Auto-derives grain from PK columns (or all columns if no PKs)
|
|
- Strips join provenance (source field)
|
|
"""
|
|
columns = [
|
|
SourceColumn(
|
|
name=c.name,
|
|
type=map_column_type(c.type),
|
|
role=ColumnRole.TIME
|
|
if map_column_type(c.type) == "time"
|
|
else ColumnRole.DEFAULT,
|
|
description=c.resolved_description,
|
|
constraints=c.constraints,
|
|
enum_values=c.enum_values,
|
|
tests=c.tests,
|
|
)
|
|
for c in entry.columns
|
|
]
|
|
pk_columns = [c.name for c in entry.columns if c.pk]
|
|
grain = pk_columns if pk_columns else [c.name for c in entry.columns]
|
|
|
|
return SourceDefinition(
|
|
name=name,
|
|
table=entry.table,
|
|
description=entry.resolved_description,
|
|
grain=grain,
|
|
columns=columns,
|
|
joins=[
|
|
JoinDeclaration(to=j.to, on=j.on, relationship=j.relationship)
|
|
for j in entry.joins
|
|
],
|
|
default_time_dimension=entry.default_time_dimension,
|
|
tags=entry.tags,
|
|
freshness=entry.freshness,
|
|
)
|