mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
* fix(context): merge overlay columns onto manifest columns by name composeOverlay was appending overlay columns to the manifest column list, producing duplicate entries when dbt/metabase overlays declared a column just to attach descriptions. The duplicates carried no `type`, so the pydantic SourceDefinition rejected them at semantic-query time and broke `ktx sl query` for every overlay-backed measure. Now overlay columns match base columns by name (case-insensitive): same-name entries merge onto the manifest (overlay fields win, type/role fall back to the base, descriptions merge per source key) and only new names append. * refactor(sl): split overlay columns from column_overrides and enforce TS/Python wire contract Overlay sources now have two distinct collections: `columns:` for computed columns (requiring `expr` + `type`) and `column_overrides:` for metadata patches to inherited manifest columns. Composing or loading an overlay that mixes the two — or references an unknown column — fails with a typed error. Introduce `ResolvedSemanticLayerSource` / `resolvedSourceSchema` / `toResolvedWire` as the strict shape sent to the Python engine, and add a schema contract test that diffs Zod against the Pydantic JSON schema dumped by `python -m semantic_layer dump-schema`. `SourceDefinition` is now `extra="forbid"` on the Python side. `loadAllSources` surfaces per-file load errors instead of swallowing them, so validation/query paths can report manifest shard parse failures. * fix(context): make scan description generation resilient and quiet A transient sampleTable failure during ingest used to take out every table in a connection: generateTableDescription returned a hardcoded 'Table not found' string into descriptions.ai, and KtxDescriptionGenerator was constructed without a logger, so the failure left no trail anywhere. - sampleTable / sampleColumn calls retry 3x with 200/400/800ms backoff, honouring KtxScanContext.signal via a new KtxAbortedError. - On retry exhaustion or missing capability, table generation falls back to a metadata-only prompt built from column name / native type / comment / rawDescriptions. The column path follows the same rule -- call the LLM when any of samples or rawDescriptions are available; skip only when both are absent. - Logger is now threaded from KtxScanContext into the generator. Failures emit structured KtxScanWarning entries (new description_fallback_used code, plus existing sampling_failed / enrichment_failed / connector_capability_missing). ktx scan groups warnings by code so a batch of identical failures collapses to one summary line plus sample. - Returns null on failure instead of the 'Table not found' sentinel; the manifest writer's existing guard already skips empty descriptions, so schema YAML no longer carries misleading text. SCAN_MANAGED_DESCRIPTION_KEYS already strips stale 'ai' on merge, so existing YAML clears on next run. Also suppress AI SDK v6 'system in messages' warning: pull system messages out of KtxMessageBuilder.wrapSimple's output via a new splitKtxSystemMessages helper and pass them top-level to generateText (preserves cacheControl providerOptions on the SystemModelMessage). Agent-runner's local splitSystemPromptMessages dedupes onto the shared helper. * test(docs): align examples-docs assertions with revamped docs PR #103 (setup/guide doc revamp) reworded several CLI examples and connection labels; the assertions in scripts/examples-docs.test.mjs still referenced the pre-revamp wording and were failing in CI on main. Update the regexes to match the post-revamp content: - drop the `--json` flag from the sl-query example expectation - move the `Driver:` / `Status: ok` probe to the connection reference, which is where that output now lives (driver id is lowercase `postgres`, not the display name `PostgreSQL`) - drop the obsolete `Install \`uv\`...` troubleshooting line - accept `<connectionId>` everywhere; the docs no longer use the hyphenated `<connection-id>` form - match the `warehouse` connection id used in the quickstart instead of the `postgres-warehouse` id only used in the README and setup ref * fix(sl): skip TS/Python schema contract test when uv is unavailable The TypeScript checks CI job does not install uv or Python, so the module-level `execFileSync('uv', ...)` in schemas.contract.test.ts threw ENOENT and failed the suite. Wrap the schema dump in a try/catch and guard the describe block with `describe.skipIf` so the test skips in environments without uv. Local dev and any CI job that has uv on PATH still runs the cross-language contract assertion.
719 lines
26 KiB
Python
719 lines
26 KiB
Python
"""Tests for manifest models, projection, overlay validation, and two-tier loading."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
import yaml
|
|
|
|
from semantic_layer.loader import SourceLoader
|
|
from semantic_layer.manifest import (
|
|
ManifestColumn,
|
|
ManifestEntry,
|
|
ManifestJoin,
|
|
map_column_type,
|
|
project_manifest_entry,
|
|
validate_overlay,
|
|
)
|
|
from semantic_layer.models import ColumnRole
|
|
|
|
|
|
# ── Type Mapping Tests ──────────────────────────────────────────────
|
|
|
|
|
|
class TestMapColumnType:
|
|
def test_map_column_type_numbers(self):
|
|
number_types = [
|
|
"integer",
|
|
"bigint",
|
|
"smallint",
|
|
"numeric",
|
|
"decimal",
|
|
"float",
|
|
"double",
|
|
"real",
|
|
"int",
|
|
"int2",
|
|
"int4",
|
|
"int8",
|
|
"float4",
|
|
"float8",
|
|
"double precision",
|
|
"number",
|
|
"tinyint",
|
|
"mediumint",
|
|
]
|
|
for db_type in number_types:
|
|
assert map_column_type(db_type) == "number", (
|
|
f"{db_type} should map to 'number'"
|
|
)
|
|
|
|
def test_map_column_type_time(self):
|
|
time_types = [
|
|
"timestamp",
|
|
"timestamptz",
|
|
"timestamp with time zone",
|
|
"timestamp without time zone",
|
|
"TIMESTAMP_NTZ",
|
|
"TIMESTAMP_LTZ",
|
|
"TIMESTAMP_TZ",
|
|
"datetime",
|
|
"date",
|
|
"time",
|
|
"timetz",
|
|
]
|
|
for db_type in time_types:
|
|
assert map_column_type(db_type) == "time", f"{db_type} should map to 'time'"
|
|
|
|
def test_map_column_type_boolean(self):
|
|
for db_type in ["boolean", "bool"]:
|
|
assert map_column_type(db_type) == "boolean", (
|
|
f"{db_type} should map to 'boolean'"
|
|
)
|
|
|
|
def test_map_column_type_string_fallback(self):
|
|
string_types = ["varchar", "text", "char", "unknown", "jsonb", "xml"]
|
|
for db_type in string_types:
|
|
assert map_column_type(db_type) == "string", (
|
|
f"{db_type} should map to 'string'"
|
|
)
|
|
|
|
def test_map_column_type_strips_precision(self):
|
|
assert map_column_type("numeric(10,2)") == "number"
|
|
assert map_column_type("varchar(255)") == "string"
|
|
assert map_column_type("decimal(18,4)") == "number"
|
|
assert map_column_type("timestamp(6)") == "time"
|
|
assert map_column_type("char(1)") == "string"
|
|
|
|
|
|
# ── Manifest Projection Tests ──────────────────────────────────────
|
|
|
|
|
|
class TestProjectManifestEntry:
|
|
@pytest.fixture()
|
|
def orders_entry(self) -> ManifestEntry:
|
|
return ManifestEntry(
|
|
table="public.orders",
|
|
descriptions={"user": "Customer orders"},
|
|
columns=[
|
|
ManifestColumn(name="id", type="integer", pk=True),
|
|
ManifestColumn(name="customer_id", type="integer"),
|
|
ManifestColumn(name="total", type="numeric"),
|
|
ManifestColumn(name="status", type="varchar"),
|
|
ManifestColumn(name="created_at", type="timestamp"),
|
|
],
|
|
joins=[
|
|
ManifestJoin(
|
|
to="customers",
|
|
on="orders.customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
source="formal",
|
|
),
|
|
],
|
|
)
|
|
|
|
def test_project_manifest_entry_basic(self, orders_entry: ManifestEntry):
|
|
src = project_manifest_entry("orders", orders_entry)
|
|
assert src.name == "orders"
|
|
assert src.table == "public.orders"
|
|
assert src.description == "Customer orders"
|
|
assert len(src.columns) == 5
|
|
assert src.measures == []
|
|
col_names = [c.name for c in src.columns]
|
|
assert col_names == ["id", "customer_id", "total", "status", "created_at"]
|
|
|
|
def test_project_manifest_entry_type_mapping(self, orders_entry: ManifestEntry):
|
|
src = project_manifest_entry("orders", orders_entry)
|
|
col_types = {c.name: c.type for c in src.columns}
|
|
assert col_types["id"] == "number"
|
|
assert col_types["customer_id"] == "number"
|
|
assert col_types["total"] == "number"
|
|
assert col_types["status"] == "string"
|
|
assert col_types["created_at"] == "time"
|
|
|
|
def test_project_manifest_entry_grain_from_pk(self, orders_entry: ManifestEntry):
|
|
src = project_manifest_entry("orders", orders_entry)
|
|
assert src.grain == ["id"]
|
|
|
|
def test_project_manifest_entry_grain_all_columns_no_pk(self):
|
|
entry = ManifestEntry(
|
|
table="public.events",
|
|
columns=[
|
|
ManifestColumn(name="user_id", type="integer"),
|
|
ManifestColumn(name="event_type", type="varchar"),
|
|
ManifestColumn(name="ts", type="timestamp"),
|
|
],
|
|
)
|
|
src = project_manifest_entry("events", entry)
|
|
assert src.grain == ["user_id", "event_type", "ts"]
|
|
|
|
def test_project_manifest_entry_joins_stripped(self, orders_entry: ManifestEntry):
|
|
src = project_manifest_entry("orders", orders_entry)
|
|
assert len(src.joins) == 1
|
|
join = src.joins[0]
|
|
assert join.to == "customers"
|
|
assert join.on == "orders.customer_id = customers.id"
|
|
assert join.relationship == "many_to_one"
|
|
assert not hasattr(join, "source") or getattr(join, "source", None) is None
|
|
|
|
def test_project_manifest_entry_time_role(self, orders_entry: ManifestEntry):
|
|
src = project_manifest_entry("orders", orders_entry)
|
|
time_cols = [c for c in src.columns if c.role == ColumnRole.TIME]
|
|
assert len(time_cols) == 1
|
|
assert time_cols[0].name == "created_at"
|
|
non_time = [c for c in src.columns if c.role == ColumnRole.DEFAULT]
|
|
assert len(non_time) == 4
|
|
|
|
def test_project_manifest_entry_preserves_dbt_metadata(self):
|
|
entry = ManifestEntry(
|
|
table="public.orders",
|
|
columns=[
|
|
ManifestColumn(
|
|
name="status",
|
|
type="varchar",
|
|
constraints={"dbt": {"not_null": True}},
|
|
enum_values={"dbt": ["placed", "shipped"]},
|
|
tests={"dbt": [{"name": "accepted_values", "package": "dbt"}]},
|
|
)
|
|
],
|
|
tags={"dbt": ["mart"]},
|
|
freshness={"dbt": {"loaded_at_field": "updated_at"}},
|
|
)
|
|
|
|
src = project_manifest_entry("orders", entry)
|
|
|
|
assert src.columns[0].constraints is not None
|
|
assert src.columns[0].constraints["dbt"].not_null is True
|
|
assert src.columns[0].enum_values == {"dbt": ["placed", "shipped"]}
|
|
assert src.columns[0].tests is not None
|
|
assert src.columns[0].tests.model_dump(mode="python", exclude_none=True) == {
|
|
"dbt": [{"name": "accepted_values", "package": "dbt"}]
|
|
}
|
|
assert src.tags == {"dbt": ["mart"]}
|
|
assert src.freshness is not None
|
|
assert src.freshness["dbt"].loaded_at_field == "updated_at"
|
|
|
|
|
|
# ── Overlay Validation Tests ───────────────────────────────────────
|
|
|
|
|
|
class TestValidateOverlay:
|
|
def test_validate_overlay_valid(self):
|
|
data = {
|
|
"name": "orders",
|
|
"descriptions": {"user": "Revenue-bearing orders"},
|
|
"grain": ["id"],
|
|
"measures": [{"name": "revenue", "expr": "sum(total)"}],
|
|
"column_overrides": [
|
|
{"name": "status", "descriptions": {"user": "Order lifecycle status"}}
|
|
],
|
|
"columns": [
|
|
{"name": "is_high_value", "expr": "total > 1000", "type": "boolean"}
|
|
],
|
|
"exclude_columns": ["status"],
|
|
}
|
|
errors = validate_overlay(data, {"status", "total"})
|
|
assert errors == []
|
|
|
|
def test_validate_overlay_rejects_table(self):
|
|
data = {"name": "orders", "table": "public.orders"}
|
|
errors = validate_overlay(data)
|
|
assert len(errors) == 1
|
|
assert "table" in errors[0].lower()
|
|
|
|
def test_validate_overlay_rejects_sql(self):
|
|
data = {"name": "orders", "sql": "SELECT * FROM orders"}
|
|
errors = validate_overlay(data)
|
|
assert len(errors) == 1
|
|
assert "sql" in errors[0].lower()
|
|
|
|
def test_validate_overlay_rejects_column_without_expr(self):
|
|
data = {
|
|
"name": "orders",
|
|
"columns": [{"name": "status", "type": "string"}],
|
|
}
|
|
errors = validate_overlay(data)
|
|
assert len(errors) == 1
|
|
assert "expr" in errors[0].lower()
|
|
|
|
def test_validate_overlay_allows_type_with_expr(self):
|
|
data = {
|
|
"name": "orders",
|
|
"columns": [{"name": "is_big", "type": "boolean", "expr": "total > 1000"}],
|
|
}
|
|
errors = validate_overlay(data)
|
|
assert errors == []
|
|
|
|
def test_validate_overlay_rejects_column_override_structural_fields(self):
|
|
data = {
|
|
"name": "orders",
|
|
"column_overrides": [
|
|
{
|
|
"name": "status",
|
|
"description": "Status",
|
|
"type": "string",
|
|
"expr": "status",
|
|
}
|
|
],
|
|
}
|
|
errors = validate_overlay(data, {"status"})
|
|
assert len(errors) == 3
|
|
assert "descriptions" in errors[0]
|
|
assert "type" in errors[1]
|
|
assert "expr" in errors[2]
|
|
|
|
def test_validate_overlay_rejects_unknown_column_override(self):
|
|
data = {
|
|
"name": "orders",
|
|
"column_overrides": [{"name": "missing", "descriptions": {"user": "Nope"}}],
|
|
}
|
|
errors = validate_overlay(data, {"status"})
|
|
assert len(errors) == 1
|
|
assert "does not match" in errors[0]
|
|
|
|
|
|
# ── Two-Tier Loading Tests ─────────────────────────────────────────
|
|
|
|
|
|
def _write_yaml(path: Path, data: dict | list) -> None:
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as f:
|
|
yaml.dump(data, f, default_flow_style=False)
|
|
|
|
|
|
def _manifest_tables() -> dict:
|
|
"""Manifest shard with orders + customers tables."""
|
|
return {
|
|
"tables": {
|
|
"orders": {
|
|
"table": "public.orders",
|
|
"descriptions": {"user": "Customer orders"},
|
|
"columns": [
|
|
{"name": "id", "type": "integer", "pk": True},
|
|
{"name": "customer_id", "type": "integer"},
|
|
{"name": "total", "type": "numeric"},
|
|
{"name": "status", "type": "varchar"},
|
|
{"name": "created_at", "type": "timestamp"},
|
|
],
|
|
"joins": [
|
|
{
|
|
"to": "customers",
|
|
"on": "orders.customer_id = customers.id",
|
|
"relationship": "many_to_one",
|
|
"source": "formal",
|
|
},
|
|
],
|
|
},
|
|
"customers": {
|
|
"table": "public.customers",
|
|
"descriptions": {"user": "Customer accounts"},
|
|
"columns": [
|
|
{"name": "id", "type": "integer", "pk": True},
|
|
{"name": "name", "type": "varchar"},
|
|
],
|
|
"joins": [
|
|
{
|
|
"to": "orders",
|
|
"on": "customers.id = orders.customer_id",
|
|
"relationship": "one_to_many",
|
|
"source": "formal",
|
|
},
|
|
],
|
|
},
|
|
},
|
|
}
|
|
|
|
|
|
class TestTwoTierLoading:
|
|
def test_load_manifest_shard(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert "orders" in sources
|
|
assert "customers" in sources
|
|
assert sources["orders"].table == "public.orders"
|
|
assert sources["orders"].grain == ["id"]
|
|
assert sources["customers"].table == "public.customers"
|
|
|
|
def test_load_standalone_source(self, tmp_path: Path):
|
|
standalone = {
|
|
"name": "regions",
|
|
"table": "public.regions",
|
|
"grain": ["id"],
|
|
"columns": [
|
|
{"name": "id", "type": "number"},
|
|
{"name": "name", "type": "string"},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "regions.yaml", standalone)
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert "regions" in sources
|
|
assert sources["regions"].table == "public.regions"
|
|
assert sources["regions"].is_table_source
|
|
|
|
def test_overlay_descriptions_do_not_promote_base_map_to_user_source(
|
|
self, tmp_path: Path
|
|
):
|
|
standalone = {
|
|
"name": "regions",
|
|
"descriptions": {"ai": "Standalone description"},
|
|
"table": "public.regions",
|
|
"grain": ["id"],
|
|
"columns": [
|
|
{"name": "id", "type": "number"},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "a_regions.yaml", standalone)
|
|
|
|
overlay = {"name": "regions", "descriptions": {"dbt": "dbt description"}}
|
|
_write_yaml(tmp_path / "z_regions_overlay.yaml", overlay)
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert sources["regions"].description == "dbt description"
|
|
|
|
def test_load_sql_source(self, tmp_path: Path):
|
|
sql_source = {
|
|
"name": "active_users",
|
|
"sql": "SELECT id, email FROM users WHERE active = true",
|
|
"grain": ["id"],
|
|
"columns": [
|
|
{"name": "id", "type": "number"},
|
|
{"name": "email", "type": "string"},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "active_users.yaml", sql_source)
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert "active_users" in sources
|
|
assert sources["active_users"].is_sql_source
|
|
assert "SELECT" in sources["active_users"].sql
|
|
|
|
def test_load_overlay_composition(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"descriptions": {"user": "Revenue-bearing orders"},
|
|
"grain": ["id"],
|
|
"measures": [{"name": "revenue", "expr": "sum(total)"}],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
|
|
# Customers overlay (empty, just name match) to avoid cross-ref error
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
orders = sources["orders"]
|
|
assert orders.table == "public.orders"
|
|
assert orders.description == "Revenue-bearing orders"
|
|
assert len(orders.measures) == 1
|
|
assert orders.measures[0].name == "revenue"
|
|
|
|
def test_overlay_description_map_override(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {"name": "orders", "descriptions": {"user": "Overridden description"}}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
assert sources["orders"].description == "Overridden description"
|
|
|
|
def test_overlay_descriptions_map_preserves_higher_priority_manifest_description(
|
|
self, tmp_path: Path
|
|
):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"descriptions": {
|
|
"db": "DB description",
|
|
"dbt": "dbt description",
|
|
},
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
assert sources["orders"].description == "Customer orders"
|
|
|
|
def test_overlay_descriptions_map_overrides_lower_priority_db_source(
|
|
self, tmp_path: Path
|
|
):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(
|
|
schema_dir / "public.yaml",
|
|
{
|
|
"tables": {
|
|
"orders": {
|
|
"table": "public.orders",
|
|
"descriptions": {"db": "DB description"},
|
|
"columns": [{"name": "id", "type": "integer", "pk": True}],
|
|
},
|
|
"customers": {
|
|
"table": "public.customers",
|
|
"columns": [{"name": "id", "type": "integer", "pk": True}],
|
|
},
|
|
}
|
|
},
|
|
)
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"descriptions": {
|
|
"dbt": "dbt description",
|
|
},
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
assert sources["orders"].description == "dbt description"
|
|
|
|
def test_overlay_exclude_columns(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {"name": "orders", "exclude_columns": ["status"]}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
col_names = [c.name for c in sources["orders"].columns]
|
|
assert "status" not in col_names
|
|
assert "id" in col_names
|
|
assert "total" in col_names
|
|
|
|
def test_overlay_computed_columns_appended(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"columns": [
|
|
{"name": "is_high_value", "expr": "total > 1000", "type": "boolean"},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
col_names = [c.name for c in sources["orders"].columns]
|
|
assert "is_high_value" in col_names
|
|
# Original columns still present
|
|
assert "id" in col_names
|
|
assert "total" in col_names
|
|
# Computed column is at end
|
|
hv = next(c for c in sources["orders"].columns if c.name == "is_high_value")
|
|
assert hv.expr == "total > 1000"
|
|
assert hv.type == "boolean"
|
|
|
|
def test_overlay_column_overrides_patch_manifest_columns(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"column_overrides": [
|
|
{"name": "status", "descriptions": {"user": "Order lifecycle status"}}
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
status = next(c for c in sources["orders"].columns if c.name == "status")
|
|
assert status.type == "string"
|
|
assert status.description == "Order lifecycle status"
|
|
assert status.descriptions == {"user": "Order lifecycle status"}
|
|
|
|
def test_overlay_rejects_unknown_column_override(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"column_overrides": [
|
|
{"name": "missing", "descriptions": {"user": "No such column"}}
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
with pytest.raises(ValueError, match="Column override 'missing'"):
|
|
loader.load_all()
|
|
|
|
def test_overlay_rejects_computed_column_name_collision(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"columns": [{"name": "status", "type": "string", "expr": "status"}],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
with pytest.raises(ValueError, match="move it to 'column_overrides:'"):
|
|
loader.load_all()
|
|
|
|
def test_overlay_rejects_exclude_override_conflict(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"exclude_columns": ["status"],
|
|
"column_overrides": [
|
|
{"name": "status", "descriptions": {"user": "Hidden status"}}
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
with pytest.raises(ValueError, match="conflict with exclude_columns"):
|
|
loader.load_all()
|
|
|
|
def test_overlay_measures_set(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"measures": [
|
|
{"name": "revenue", "expr": "sum(total)"},
|
|
{"name": "order_count", "expr": "count(id)"},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert len(sources["orders"].measures) == 2
|
|
measure_names = {m.name for m in sources["orders"].measures}
|
|
assert measure_names == {"revenue", "order_count"}
|
|
|
|
def test_overlay_grain_override(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {"name": "orders", "grain": ["id", "customer_id"]}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
assert sources["orders"].grain == ["id", "customer_id"]
|
|
|
|
def test_overlay_join_union_and_dedupe(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
# Add a "regions" standalone so the join target exists
|
|
_write_yaml(
|
|
tmp_path / "regions.yaml",
|
|
{
|
|
"name": "regions",
|
|
"table": "public.regions",
|
|
"grain": ["id"],
|
|
"columns": [
|
|
{"name": "id", "type": "number"},
|
|
{"name": "name", "type": "string"},
|
|
],
|
|
},
|
|
)
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"joins": [
|
|
# Duplicate of manifest join (should be deduped)
|
|
{
|
|
"to": "customers",
|
|
"on": "orders.customer_id = customers.id",
|
|
"relationship": "many_to_one",
|
|
},
|
|
# New join
|
|
{
|
|
"to": "regions",
|
|
"on": "orders.region_id = regions.id",
|
|
"relationship": "many_to_one",
|
|
},
|
|
],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
joins = sources["orders"].joins
|
|
# Manifest had 1 join to customers, overlay adds 1 new (regions), duplicate deduped
|
|
assert len(joins) == 2
|
|
join_targets = [j.to for j in joins]
|
|
assert "customers" in join_targets
|
|
assert "regions" in join_targets
|
|
|
|
def test_overlay_disable_joins(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
overlay = {
|
|
"name": "orders",
|
|
"disable_joins": ["orders.customer_id = customers.id"],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
|
|
# Customers still needs to exist since the customers manifest entry has
|
|
# a join back to orders that is NOT disabled
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
sources = loader.load_all()
|
|
|
|
assert len(sources["orders"].joins) == 0
|
|
|
|
def test_overlay_rejects_invalid(self, tmp_path: Path):
|
|
schema_dir = tmp_path / "_schema"
|
|
_write_yaml(schema_dir / "public.yaml", _manifest_tables())
|
|
|
|
# An overlay with a column that has type but no expr is invalid
|
|
overlay = {
|
|
"name": "orders",
|
|
"columns": [{"name": "status", "type": "string"}],
|
|
}
|
|
_write_yaml(tmp_path / "orders.yaml", overlay)
|
|
_write_yaml(tmp_path / "customers.yaml", {"name": "customers"})
|
|
|
|
loader = SourceLoader(tmp_path)
|
|
with pytest.raises(ValueError, match="Invalid overlay"):
|
|
loader.load_all()
|