mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
299 lines
9.6 KiB
Python
299 lines
9.6 KiB
Python
from __future__ import annotations
|
|
|
|
import pytest
|
|
|
|
from semantic_layer.engine import SemanticEngine
|
|
from semantic_layer.models import (
|
|
JoinDeclaration,
|
|
SourceColumn,
|
|
SourceDefinition,
|
|
)
|
|
|
|
|
|
def _src(
|
|
name: str,
|
|
columns: list[str] | None = None,
|
|
grain: list[str] | None = None,
|
|
joins: list[JoinDeclaration] | None = None,
|
|
) -> SourceDefinition:
|
|
"""Minimal-boilerplate source factory for validator tests."""
|
|
columns = columns or ["id"]
|
|
grain = grain or ["id"]
|
|
return SourceDefinition(
|
|
name=name,
|
|
table=f"public.{name}",
|
|
grain=grain,
|
|
columns=[SourceColumn(name=c, type="number") for c in columns],
|
|
joins=joins or [],
|
|
)
|
|
|
|
|
|
class TestValidatorValid:
|
|
def test_valid_connected_model(self):
|
|
orders = _src(
|
|
"orders",
|
|
columns=["id", "customer_id"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
on="customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
customers = _src("customers")
|
|
engine = SemanticEngine.from_sources({"orders": orders, "customers": customers})
|
|
|
|
report = engine.validate()
|
|
|
|
assert report.valid
|
|
assert report.errors == []
|
|
assert report.warnings == []
|
|
|
|
|
|
class TestOrphanJoinTarget:
|
|
def test_orphan_join_target_is_error(self):
|
|
orders = _src(
|
|
"orders",
|
|
columns=["id", "customer_id"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
on="customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
# `customers` deliberately not defined
|
|
engine = SemanticEngine.from_sources({"orders": orders})
|
|
|
|
report = engine.validate()
|
|
|
|
assert not report.valid
|
|
assert any(
|
|
"orders" in e and "customers" in e and "not defined" in e
|
|
for e in report.errors
|
|
)
|
|
|
|
def test_query_with_orphan_target_raises_before_sql(self):
|
|
"""Query path must reject orphan targets, not silently emit SQL
|
|
that references the undefined table name (which could read a real
|
|
unmodeled table sharing that name)."""
|
|
orders = _src(
|
|
"orders",
|
|
columns=["id", "amount", "customer_id"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
on="customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
engine = SemanticEngine.from_sources({"orders": orders})
|
|
|
|
with pytest.raises(ValueError) as exc:
|
|
engine.query(
|
|
{
|
|
"measures": ["sum(orders.amount)"],
|
|
"dimensions": ["customers.id"],
|
|
}
|
|
)
|
|
msg = str(exc.value)
|
|
assert "orders" in msg
|
|
assert "customers" in msg
|
|
assert "not defined" in msg
|
|
|
|
|
|
class TestInvalidGrain:
|
|
def test_grain_column_missing_from_columns(self):
|
|
bad = _src(
|
|
"bad",
|
|
columns=["id"],
|
|
grain=["nonexistent_col"],
|
|
)
|
|
engine = SemanticEngine.from_sources({"bad": bad})
|
|
|
|
report = engine.validate()
|
|
|
|
assert not report.valid
|
|
assert any("bad" in e and "nonexistent_col" in e for e in report.errors)
|
|
|
|
|
|
class TestDisconnectedComponents:
|
|
def test_two_components_produce_warning_not_error(self):
|
|
a = _src("a")
|
|
b = _src("b")
|
|
engine = SemanticEngine.from_sources({"a": a, "b": b})
|
|
|
|
report = engine.validate()
|
|
|
|
assert report.valid
|
|
assert report.errors == []
|
|
assert len(report.warnings) >= 1
|
|
disconnection = next(
|
|
(w for w in report.warnings if "disconnected components" in w), None
|
|
)
|
|
assert disconnection is not None
|
|
assert "2 disconnected components" in disconnection
|
|
assert "Component 1" in disconnection
|
|
assert "Component 2" in disconnection
|
|
|
|
def test_aliases_do_not_create_false_disconnection(self):
|
|
"""Two aliases of the same base source must count as one component
|
|
with the base, not as separate islands."""
|
|
orders = SourceDefinition(
|
|
name="orders",
|
|
table="public.orders",
|
|
grain=["id"],
|
|
columns=[
|
|
SourceColumn(name="id", type="number"),
|
|
SourceColumn(name="amount", type="number"),
|
|
SourceColumn(name="billing_customer_id", type="number"),
|
|
SourceColumn(name="shipping_customer_id", type="number"),
|
|
],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
alias="billing_customer",
|
|
on="billing_customer_id = billing_customer.id",
|
|
relationship="many_to_one",
|
|
),
|
|
JoinDeclaration(
|
|
to="customers",
|
|
alias="shipping_customer",
|
|
on="shipping_customer_id = shipping_customer.id",
|
|
relationship="many_to_one",
|
|
),
|
|
],
|
|
)
|
|
customers = _src("customers", columns=["id", "segment"])
|
|
engine = SemanticEngine.from_sources({"orders": orders, "customers": customers})
|
|
|
|
report = engine.validate()
|
|
|
|
assert report.valid
|
|
assert not any("disconnected components" in w for w in report.warnings)
|
|
|
|
def test_large_component_is_truncated(self):
|
|
many = {f"s{i}": _src(f"s{i}") for i in range(10)}
|
|
# Join them sequentially so they form one big component
|
|
for i in range(9):
|
|
many[f"s{i}"].joins.append(
|
|
JoinDeclaration(
|
|
to=f"s{i + 1}",
|
|
on=f"id = s{i + 1}.id",
|
|
relationship="many_to_one",
|
|
)
|
|
)
|
|
many["island"] = _src("island")
|
|
engine = SemanticEngine.from_sources(many)
|
|
|
|
report = engine.validate()
|
|
|
|
disconnection = next(
|
|
w for w in report.warnings if "disconnected components" in w
|
|
)
|
|
assert "(10 sources)" in disconnection
|
|
assert "... (+8 more)" in disconnection
|
|
assert "(1 sources): island" in disconnection
|
|
|
|
def test_singleton_component_warning_names_recently_touched_source(self):
|
|
orders = _src(
|
|
"orders",
|
|
columns=["id", "customer_id"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
on="customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
customers = _src("customers")
|
|
lonely_source = _src("lonely_source")
|
|
engine = SemanticEngine.from_sources(
|
|
{
|
|
"orders": orders,
|
|
"customers": customers,
|
|
"lonely_source": lonely_source,
|
|
}
|
|
)
|
|
|
|
report = engine.validate(recently_touched={"lonely_source"})
|
|
|
|
assert report.per_source_warnings["lonely_source"]
|
|
msg = report.per_source_warnings["lonely_source"][0]
|
|
assert "lonely_source" in msg
|
|
assert "singleton" in msg.lower() or "no joins" in msg.lower()
|
|
|
|
def test_no_per_source_warning_for_connected_recently_touched_source(self):
|
|
orders = _src(
|
|
"orders",
|
|
columns=["id", "customer_id"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="customers",
|
|
on="customer_id = customers.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
customers = _src("customers")
|
|
engine = SemanticEngine.from_sources({"orders": orders, "customers": customers})
|
|
|
|
report = engine.validate(recently_touched={"orders"})
|
|
|
|
assert report.per_source_warnings.get("orders", []) == []
|
|
|
|
def test_recently_touched_default_none_preserves_existing_behavior(self):
|
|
lonely = _src("lonely")
|
|
other = _src("other")
|
|
engine = SemanticEngine.from_sources({"lonely": lonely, "other": other})
|
|
|
|
report = engine.validate()
|
|
|
|
assert any("disconnected components" in w for w in report.warnings)
|
|
assert report.per_source_warnings == {}
|
|
|
|
|
|
class TestEcommerceSmoke:
|
|
def test_ecommerce_fixtures_validate_cleanly(self, ecommerce_sources):
|
|
engine = SemanticEngine.from_sources(ecommerce_sources)
|
|
|
|
report = engine.validate()
|
|
|
|
assert report.valid, f"Expected clean report, got errors: {report.errors}"
|
|
assert report.warnings == [], f"Expected no warnings, got: {report.warnings}"
|
|
|
|
|
|
class TestMultipleIssuesCollected:
|
|
def test_errors_and_warnings_coexist(self):
|
|
bad_grain = _src("bad_grain", columns=["id"], grain=["missing"])
|
|
orphan_target = _src(
|
|
"with_orphan",
|
|
columns=["id", "fk"],
|
|
joins=[
|
|
JoinDeclaration(
|
|
to="doesnt_exist",
|
|
on="fk = doesnt_exist.id",
|
|
relationship="many_to_one",
|
|
)
|
|
],
|
|
)
|
|
isolated = _src("isolated")
|
|
engine = SemanticEngine.from_sources(
|
|
{
|
|
"bad_grain": bad_grain,
|
|
"with_orphan": orphan_target,
|
|
"isolated": isolated,
|
|
}
|
|
)
|
|
|
|
report = engine.validate()
|
|
|
|
assert not report.valid
|
|
assert len(report.errors) >= 2
|
|
assert any("missing" in e for e in report.errors)
|
|
assert any("doesnt_exist" in e for e in report.errors)
|
|
assert len(report.warnings) >= 1
|