mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-07 07:55:13 +02:00
99 lines
4.1 KiB
Python
99 lines
4.1 KiB
Python
"""Detect semantically-redundant measure definitions on the same source."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import sqlglot
|
|
from sqlglot import exp
|
|
|
|
from semantic_layer.models import SourceDefinition
|
|
from semantic_layer.parser import quote_reserved_identifiers
|
|
|
|
# DIALECT CONVENTION:
|
|
# Measure `expr` values are compared structurally. They must be parsed with
|
|
# the connection's native dialect (per sl_capture); parsing as postgres
|
|
# would drop dialect-specific tokens and miss duplicates across BigQuery
|
|
# `SAFE_DIVIDE` / Snowflake `DIV0` etc.
|
|
|
|
|
|
def validate_measure_duplicates(
|
|
sources: dict[str, SourceDefinition],
|
|
*,
|
|
dialect: str = "postgres",
|
|
) -> list[str]:
|
|
"""
|
|
Flag pairs of measures on the same source whose `expr` is structurally
|
|
equivalent. Intended to prevent capture-time churn like:
|
|
|
|
- name: active_subscription_count
|
|
expr: count(*)
|
|
filter: is_active = true
|
|
- name: new_subscription_count
|
|
expr: count(*) # same base aggregation — should be query-time filter
|
|
|
|
Returns a list of human-readable error strings (empty list = no duplicates).
|
|
Compares every pair of measures within a single source; does not compare
|
|
across sources (measures on different sources are never redundant).
|
|
"""
|
|
errors: list[str] = []
|
|
for source_name, source in sources.items():
|
|
if len(source.measures) < 2:
|
|
continue
|
|
|
|
parsed: list[tuple[str, exp.Expression | None, str | None, frozenset[str]]] = []
|
|
for m in source.measures:
|
|
try:
|
|
quoted = quote_reserved_identifiers(m.expr)
|
|
tree = sqlglot.parse_one(f"SELECT {quoted}", read=dialect)
|
|
expr_node = tree.expressions[0] if tree.expressions else None
|
|
except Exception:
|
|
# Unparseable expressions are left for the caller's normal
|
|
# validation to surface; don't block on parse failure here.
|
|
expr_node = None
|
|
parsed.append((m.name, expr_node, m.filter, frozenset(m.segments)))
|
|
|
|
for i, (name_a, expr_a, filter_a, segments_a) in enumerate(parsed):
|
|
if expr_a is None:
|
|
continue
|
|
for name_b, expr_b, filter_b, segments_b in parsed[i + 1 :]:
|
|
if expr_b is None:
|
|
continue
|
|
if not _expressions_equivalent(expr_a, expr_b):
|
|
continue
|
|
|
|
# Segments are named, reusable filter predicates; two measures
|
|
# sharing an expr but applying different segments are by design
|
|
# distinct and must not be flagged.
|
|
if segments_a != segments_b:
|
|
continue
|
|
|
|
fa = (filter_a or "").strip()
|
|
fb = (filter_b or "").strip()
|
|
if fa == fb:
|
|
errors.append(
|
|
f"{source_name}: measures '{name_a}' and '{name_b}' have the same "
|
|
f"expression and filter — remove one or differentiate them."
|
|
)
|
|
else:
|
|
errors.append(
|
|
f"{source_name}: measure '{name_b}' has the same expression as "
|
|
f"'{name_a}' — differs only by `filter`. Use query-time filtering "
|
|
f"on '{name_a}' (via semantic_query filters), or, if the filter "
|
|
f"encodes a named business segment, add a segments[] entry on this "
|
|
f"source and reference it instead."
|
|
)
|
|
return errors
|
|
|
|
|
|
def _expressions_equivalent(a: exp.Expression, b: exp.Expression) -> bool:
|
|
"""
|
|
Structural equality on sqlglot ASTs.
|
|
|
|
Normalizes via sqlglot's .sql() canonical form (handles whitespace, case,
|
|
aliasing). Does NOT reorder operands — `safe_divide(a, b)` is NOT equal to
|
|
`safe_divide(b, a)`, nor is `a - b` equal to `b - a`. This is deliberate:
|
|
the check's purpose is catching accidental redundancy, not proving
|
|
mathematical equivalence.
|
|
"""
|
|
if type(a) is not type(b):
|
|
return False
|
|
return a.sql(dialect="postgres") == b.sql(dialect="postgres")
|