mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
1057 lines
35 KiB
Python
1057 lines
35 KiB
Python
|
|
"""Parse LookML projects into resolved, KSL-ready structures.
|
||
|
|
|
||
|
|
Pipeline: parse files, collect constants, substitute constants, resolve
|
||
|
|
extends/refinements, resolve column references, and build measures/joins.
|
||
|
|
"""
|
||
|
|
|
||
|
|
from __future__ import annotations
|
||
|
|
|
||
|
|
import logging
|
||
|
|
import re
|
||
|
|
from typing import Any, Literal
|
||
|
|
|
||
|
|
import lkml
|
||
|
|
import sqlglot
|
||
|
|
from pydantic import BaseModel
|
||
|
|
from sqlglot import exp
|
||
|
|
|
||
|
|
logger = logging.getLogger(__name__)
|
||
|
|
|
||
|
|
# ── Public models ──────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
CONSTANT_RE = re.compile(r"@\{(\w+)\}")
|
||
|
|
TABLE_REF_RE = re.compile(r"^\s*\$\{TABLE\}\.(\w+)\s*$")
|
||
|
|
FIELD_REF_RE = re.compile(r"\$\{(\w+)\}")
|
||
|
|
VIEW_FIELD_REF_RE = re.compile(r"\$\{(\w+)\.(\w+)\}")
|
||
|
|
LIQUID_RE = re.compile(r"\{%.*?%\}")
|
||
|
|
AGGREGATE_FUNC_RE = re.compile(
|
||
|
|
r"\b(min|max|sum|avg|count|count_distinct|median)\s*\(", re.IGNORECASE
|
||
|
|
)
|
||
|
|
|
||
|
|
LOOKML_TYPE_MAP: dict[str, str] = {
|
||
|
|
"string": "string",
|
||
|
|
"tier": "string",
|
||
|
|
"zipcode": "string",
|
||
|
|
"location": "string",
|
||
|
|
"number": "number",
|
||
|
|
"yesno": "boolean",
|
||
|
|
"time": "time",
|
||
|
|
"date": "time",
|
||
|
|
"date_time": "time",
|
||
|
|
"date_raw": "time",
|
||
|
|
"date_date": "time",
|
||
|
|
"date_week": "time",
|
||
|
|
"date_month": "time",
|
||
|
|
"date_quarter": "time",
|
||
|
|
"date_year": "time",
|
||
|
|
"duration": "number",
|
||
|
|
}
|
||
|
|
|
||
|
|
MEASURE_TYPE_MAP: dict[str, str] = {
|
||
|
|
"count": "count(*)",
|
||
|
|
"count_distinct": "count_distinct",
|
||
|
|
"sum": "sum",
|
||
|
|
"sum_distinct": "sum",
|
||
|
|
"average": "avg",
|
||
|
|
"average_distinct": "avg",
|
||
|
|
"min": "min",
|
||
|
|
"max": "max",
|
||
|
|
"median": "median",
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
class LookMLFileInput(BaseModel):
|
||
|
|
path: str
|
||
|
|
content: str
|
||
|
|
|
||
|
|
|
||
|
|
class ParseLookMLRequest(BaseModel):
|
||
|
|
files: list[LookMLFileInput]
|
||
|
|
constant_overrides: dict[str, str] | None = None
|
||
|
|
dialect: str = "postgres"
|
||
|
|
|
||
|
|
|
||
|
|
class SkippedItem(BaseModel):
|
||
|
|
name: str
|
||
|
|
item_type: str
|
||
|
|
reason: str
|
||
|
|
|
||
|
|
|
||
|
|
class ParsedColumn(BaseModel):
|
||
|
|
name: str
|
||
|
|
lookml_name: str
|
||
|
|
type: str
|
||
|
|
role: str
|
||
|
|
visibility: str
|
||
|
|
description: str | None = None
|
||
|
|
is_computed: bool = False
|
||
|
|
expr: str | None = None
|
||
|
|
|
||
|
|
|
||
|
|
class ParsedMeasure(BaseModel):
|
||
|
|
name: str
|
||
|
|
expr: str
|
||
|
|
filter: str | None = None
|
||
|
|
description: str | None = None
|
||
|
|
|
||
|
|
|
||
|
|
class ParsedJoin(BaseModel):
|
||
|
|
source_view: str
|
||
|
|
to: str
|
||
|
|
alias: str | None = None
|
||
|
|
on: str
|
||
|
|
relationship: str
|
||
|
|
|
||
|
|
|
||
|
|
class ParsedLookMLView(BaseModel):
|
||
|
|
name: str
|
||
|
|
source_type: Literal["table", "sql"]
|
||
|
|
table_ref: str | None = None
|
||
|
|
sql: str | None = None
|
||
|
|
grain: list[str]
|
||
|
|
columns: list[ParsedColumn]
|
||
|
|
measures: list[ParsedMeasure]
|
||
|
|
description: str | None = None
|
||
|
|
skipped_dimensions: list[SkippedItem] = []
|
||
|
|
skipped_measures: list[SkippedItem] = []
|
||
|
|
|
||
|
|
|
||
|
|
class ParseLookMLResponse(BaseModel):
|
||
|
|
views: list[ParsedLookMLView]
|
||
|
|
joins: list[ParsedJoin]
|
||
|
|
skipped_views: list[SkippedItem]
|
||
|
|
warnings: list[str]
|
||
|
|
|
||
|
|
|
||
|
|
# ── Internal types ─────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
class _RawView:
|
||
|
|
"""Mutable intermediate view before resolution."""
|
||
|
|
|
||
|
|
def __init__(self, name: str, data: dict[str, Any], source_file: str) -> None:
|
||
|
|
self.name = name
|
||
|
|
self.source_file = source_file
|
||
|
|
self.extension_required: bool = data.get("extension") == "required"
|
||
|
|
self.is_refinement: bool = name.startswith("+")
|
||
|
|
self.sql_table_name: str | None = data.get("sql_table_name")
|
||
|
|
self.label: str | None = data.get("label")
|
||
|
|
self.description: str | None = data.get("description")
|
||
|
|
|
||
|
|
self.extends: list[str] = _flatten_all(data.get("extends__all"))
|
||
|
|
self.derived_table: dict[str, Any] | None = data.get("derived_table")
|
||
|
|
|
||
|
|
self.dimensions: dict[str, dict[str, Any]] = {
|
||
|
|
d["name"]: d for d in data.get("dimensions", [])
|
||
|
|
}
|
||
|
|
self.dimension_groups: dict[str, dict[str, Any]] = {
|
||
|
|
dg["name"]: dg for dg in data.get("dimension_groups", [])
|
||
|
|
}
|
||
|
|
self.measures: dict[str, dict[str, Any]] = {
|
||
|
|
m["name"]: m for m in data.get("measures", [])
|
||
|
|
}
|
||
|
|
|
||
|
|
|
||
|
|
# ── Main entry point ──────────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def parse_lookml_project(request: ParseLookMLRequest) -> ParseLookMLResponse:
|
||
|
|
"""Parse and resolve a LookML project into KSL-ready structures."""
|
||
|
|
constants = _collect_constants(request.files, request.constant_overrides or {})
|
||
|
|
raw_views, raw_explores = _parse_all_files(request.files, constants)
|
||
|
|
resolved = _resolve_inheritance(raw_views)
|
||
|
|
views, skipped_views, warnings = _build_parsed_views(resolved, request.dialect)
|
||
|
|
all_joins = _build_parsed_joins(raw_explores, resolved)
|
||
|
|
|
||
|
|
# Filter out joins that reference skipped views (as source or target)
|
||
|
|
emitted_view_names = {v.name for v in views}
|
||
|
|
joins = [
|
||
|
|
j
|
||
|
|
for j in all_joins
|
||
|
|
if j.source_view in emitted_view_names
|
||
|
|
and (j.to in emitted_view_names or j.alias)
|
||
|
|
]
|
||
|
|
|
||
|
|
return ParseLookMLResponse(
|
||
|
|
views=views,
|
||
|
|
joins=joins,
|
||
|
|
skipped_views=skipped_views,
|
||
|
|
warnings=warnings,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
# ── Parsing ────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def _collect_constants(
|
||
|
|
files: list[LookMLFileInput], overrides: dict[str, str]
|
||
|
|
) -> dict[str, str]:
|
||
|
|
"""Extract constants from manifest.lkml and apply overrides."""
|
||
|
|
constants: dict[str, str] = {}
|
||
|
|
for f in files:
|
||
|
|
if "manifest" in f.path.lower():
|
||
|
|
parsed = lkml.load(f.content)
|
||
|
|
for c in parsed.get("constants", []):
|
||
|
|
constants[c["name"]] = c.get("value", "")
|
||
|
|
constants.update(overrides)
|
||
|
|
return constants
|
||
|
|
|
||
|
|
|
||
|
|
def _substitute_constants(text: str | None, constants: dict[str, str]) -> str | None:
|
||
|
|
if text is None:
|
||
|
|
return None
|
||
|
|
return CONSTANT_RE.sub(lambda m: constants.get(m.group(1), m.group(0)), text)
|
||
|
|
|
||
|
|
|
||
|
|
def _substitute_constants_in_view(
|
||
|
|
view_data: dict[str, Any], constants: dict[str, str]
|
||
|
|
) -> None:
|
||
|
|
"""In-place constant substitution across all SQL fields in a view dict."""
|
||
|
|
for key in ("sql_table_name", "sql"):
|
||
|
|
if key in view_data:
|
||
|
|
view_data[key] = _substitute_constants(view_data[key], constants)
|
||
|
|
|
||
|
|
dt = view_data.get("derived_table")
|
||
|
|
if dt and "sql" in dt:
|
||
|
|
dt["sql"] = _substitute_constants(dt["sql"], constants)
|
||
|
|
|
||
|
|
for dim in view_data.get("dimensions", []):
|
||
|
|
if "sql" in dim:
|
||
|
|
dim["sql"] = _substitute_constants(dim["sql"], constants)
|
||
|
|
for dg in view_data.get("dimension_groups", []):
|
||
|
|
if "sql" in dg:
|
||
|
|
dg["sql"] = _substitute_constants(dg["sql"], constants)
|
||
|
|
for m in view_data.get("measures", []):
|
||
|
|
if "sql" in m:
|
||
|
|
m["sql"] = _substitute_constants(m["sql"], constants)
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_all_files(
|
||
|
|
files: list[LookMLFileInput], constants: dict[str, str]
|
||
|
|
) -> tuple[dict[str, _RawView], list[dict[str, Any]]]:
|
||
|
|
"""Parse all .lkml files and return raw views + explores."""
|
||
|
|
all_views: dict[str, _RawView] = {}
|
||
|
|
all_explores: list[dict[str, Any]] = []
|
||
|
|
|
||
|
|
for f in files:
|
||
|
|
try:
|
||
|
|
parsed = lkml.load(f.content)
|
||
|
|
except Exception:
|
||
|
|
logger.warning("Failed to parse %s, skipping", f.path)
|
||
|
|
continue
|
||
|
|
|
||
|
|
for view_data in parsed.get("views", []):
|
||
|
|
_substitute_constants_in_view(view_data, constants)
|
||
|
|
name = view_data["name"]
|
||
|
|
rv = _RawView(name, view_data, f.path)
|
||
|
|
all_views[name] = rv
|
||
|
|
|
||
|
|
for explore_data in parsed.get("explores", []):
|
||
|
|
# Substitute constants in sql_on for joins
|
||
|
|
for j in explore_data.get("joins", []):
|
||
|
|
if "sql_on" in j:
|
||
|
|
j["sql_on"] = _substitute_constants(j["sql_on"], constants)
|
||
|
|
all_explores.append(explore_data)
|
||
|
|
|
||
|
|
return all_views, all_explores
|
||
|
|
|
||
|
|
|
||
|
|
# ── Inheritance resolution ─────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def _flatten_all(val: Any) -> list[str]:
|
||
|
|
"""Flatten lkml's nested extends__all / filters__all structures."""
|
||
|
|
if val is None:
|
||
|
|
return []
|
||
|
|
if isinstance(val, list):
|
||
|
|
result: list[str] = []
|
||
|
|
for item in val:
|
||
|
|
if isinstance(item, list):
|
||
|
|
result.extend(_flatten_all(item))
|
||
|
|
elif isinstance(item, str):
|
||
|
|
result.append(item)
|
||
|
|
return result
|
||
|
|
return []
|
||
|
|
|
||
|
|
|
||
|
|
def _merge_view(parent: _RawView, child: _RawView) -> None:
|
||
|
|
"""Merge parent fields into child (child takes precedence)."""
|
||
|
|
# Dimensions: parent first, child overrides
|
||
|
|
merged_dims = dict(parent.dimensions)
|
||
|
|
merged_dims.update(child.dimensions)
|
||
|
|
child.dimensions = merged_dims
|
||
|
|
|
||
|
|
merged_dgs = dict(parent.dimension_groups)
|
||
|
|
merged_dgs.update(child.dimension_groups)
|
||
|
|
child.dimension_groups = merged_dgs
|
||
|
|
|
||
|
|
merged_measures = dict(parent.measures)
|
||
|
|
merged_measures.update(child.measures)
|
||
|
|
child.measures = merged_measures
|
||
|
|
|
||
|
|
# Inherit sql_table_name and derived_table if child doesn't have them
|
||
|
|
if child.sql_table_name is None and parent.sql_table_name is not None:
|
||
|
|
child.sql_table_name = parent.sql_table_name
|
||
|
|
if child.derived_table is None and parent.derived_table is not None:
|
||
|
|
child.derived_table = parent.derived_table
|
||
|
|
if child.description is None and parent.description is not None:
|
||
|
|
child.description = parent.description
|
||
|
|
|
||
|
|
|
||
|
|
def _apply_refinement(target: _RawView, refinement: _RawView) -> None:
|
||
|
|
"""Apply refinement fields to the target view. Metadata-only merge for existing fields."""
|
||
|
|
for name, dim in refinement.dimensions.items():
|
||
|
|
if name in target.dimensions:
|
||
|
|
# Merge metadata: label, description, hidden, tags, group_label
|
||
|
|
for key in ("label", "description", "hidden", "tags", "group_label"):
|
||
|
|
if key in dim:
|
||
|
|
target.dimensions[name][key] = dim[key]
|
||
|
|
else:
|
||
|
|
target.dimensions[name] = dim
|
||
|
|
|
||
|
|
for name, dg in refinement.dimension_groups.items():
|
||
|
|
if name in target.dimension_groups:
|
||
|
|
for key in ("label", "description", "hidden", "tags", "group_label"):
|
||
|
|
if key in dg:
|
||
|
|
target.dimension_groups[name][key] = dg[key]
|
||
|
|
else:
|
||
|
|
target.dimension_groups[name] = dg
|
||
|
|
|
||
|
|
for name, m in refinement.measures.items():
|
||
|
|
if name in target.measures:
|
||
|
|
for key in ("label", "description", "hidden", "tags", "group_label"):
|
||
|
|
if key in m:
|
||
|
|
target.measures[name][key] = m[key]
|
||
|
|
else:
|
||
|
|
target.measures[name] = m
|
||
|
|
|
||
|
|
if refinement.label:
|
||
|
|
target.label = refinement.label
|
||
|
|
if refinement.description:
|
||
|
|
target.description = refinement.description
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_inheritance(raw_views: dict[str, _RawView]) -> dict[str, _RawView]:
|
||
|
|
"""Resolve extends and refinements. Returns only concrete views."""
|
||
|
|
# Separate refinements from regular views
|
||
|
|
refinements: list[_RawView] = []
|
||
|
|
views: dict[str, _RawView] = {}
|
||
|
|
|
||
|
|
for name, rv in raw_views.items():
|
||
|
|
if rv.is_refinement:
|
||
|
|
refinements.append(rv)
|
||
|
|
else:
|
||
|
|
views[name] = rv
|
||
|
|
|
||
|
|
# Resolve extends (topological order via iterative resolution)
|
||
|
|
resolved: set[str] = set()
|
||
|
|
max_passes = len(views) + 1
|
||
|
|
for _ in range(max_passes):
|
||
|
|
progress = False
|
||
|
|
for name, view in views.items():
|
||
|
|
if name in resolved:
|
||
|
|
continue
|
||
|
|
if not view.extends:
|
||
|
|
resolved.add(name)
|
||
|
|
progress = True
|
||
|
|
continue
|
||
|
|
# Check if all parents are resolved
|
||
|
|
parents_ready = all(p in resolved for p in view.extends)
|
||
|
|
if parents_ready:
|
||
|
|
for parent_name in view.extends:
|
||
|
|
parent = views.get(parent_name)
|
||
|
|
if parent:
|
||
|
|
_merge_view(parent, view)
|
||
|
|
resolved.add(name)
|
||
|
|
progress = True
|
||
|
|
if not progress:
|
||
|
|
break
|
||
|
|
|
||
|
|
# Apply refinements
|
||
|
|
for ref in refinements:
|
||
|
|
target_name = ref.name.lstrip("+")
|
||
|
|
target = views.get(target_name)
|
||
|
|
if target:
|
||
|
|
_apply_refinement(target, ref)
|
||
|
|
else:
|
||
|
|
logger.warning(
|
||
|
|
"Refinement target '%s' not found for '%s'", target_name, ref.name
|
||
|
|
)
|
||
|
|
|
||
|
|
return views
|
||
|
|
|
||
|
|
|
||
|
|
# ── Alias view detection ──────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def _detect_alias_views(resolved: dict[str, _RawView]) -> dict[str, str]:
|
||
|
|
"""Detect views that are aliases of another view (same table_ref, extends parent).
|
||
|
|
|
||
|
|
Returns dict of {alias_name: parent_name}.
|
||
|
|
"""
|
||
|
|
# Build table_ref → first view name map (canonical view per table)
|
||
|
|
table_to_canonical: dict[str, str] = {}
|
||
|
|
for name, rv in resolved.items():
|
||
|
|
if rv.extension_required or rv.is_refinement:
|
||
|
|
continue
|
||
|
|
if rv.sql_table_name and not rv.extends:
|
||
|
|
table_ref = rv.sql_table_name.strip()
|
||
|
|
if table_ref not in table_to_canonical:
|
||
|
|
table_to_canonical[table_ref] = name
|
||
|
|
|
||
|
|
# Find views that extend another, share the same table_ref,
|
||
|
|
# and add no new fields (pure aliases used only for join renaming).
|
||
|
|
alias_views: dict[str, str] = {}
|
||
|
|
for name, rv in resolved.items():
|
||
|
|
if rv.extension_required or rv.is_refinement:
|
||
|
|
continue
|
||
|
|
if rv.extends and rv.sql_table_name:
|
||
|
|
table_ref = rv.sql_table_name.strip()
|
||
|
|
canonical = table_to_canonical.get(table_ref)
|
||
|
|
if canonical and canonical != name and _is_pure_alias(rv, resolved):
|
||
|
|
alias_views[name] = canonical
|
||
|
|
|
||
|
|
return alias_views
|
||
|
|
|
||
|
|
|
||
|
|
def _is_pure_alias(rv: _RawView, resolved: dict[str, _RawView]) -> bool:
|
||
|
|
"""A view is a pure alias if it adds no new fields and does not override any
|
||
|
|
inherited field's definition. Compare dict values, not just names — a child
|
||
|
|
that redefines a measure/dimension must not be classified as alias-only."""
|
||
|
|
parent_dims: dict[str, dict] = {}
|
||
|
|
parent_dgs: dict[str, dict] = {}
|
||
|
|
parent_meas: dict[str, dict] = {}
|
||
|
|
for parent_name in rv.extends:
|
||
|
|
parent = resolved.get(parent_name)
|
||
|
|
if parent:
|
||
|
|
parent_dims.update(parent.dimensions)
|
||
|
|
parent_dgs.update(parent.dimension_groups)
|
||
|
|
parent_meas.update(parent.measures)
|
||
|
|
|
||
|
|
for name, d in rv.dimensions.items():
|
||
|
|
if parent_dims.get(name) != d:
|
||
|
|
return False
|
||
|
|
for name, d in rv.dimension_groups.items():
|
||
|
|
if parent_dgs.get(name) != d:
|
||
|
|
return False
|
||
|
|
for name, d in rv.measures.items():
|
||
|
|
if parent_meas.get(name) != d:
|
||
|
|
return False
|
||
|
|
return True
|
||
|
|
|
||
|
|
|
||
|
|
# ── View → ParsedLookMLView conversion ────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def _build_parsed_views(
|
||
|
|
resolved: dict[str, _RawView],
|
||
|
|
dialect: str,
|
||
|
|
) -> tuple[list[ParsedLookMLView], list[SkippedItem], list[str]]:
|
||
|
|
views: list[ParsedLookMLView] = []
|
||
|
|
skipped: list[SkippedItem] = []
|
||
|
|
warnings: list[str] = []
|
||
|
|
|
||
|
|
# Detect aliased views: views that extend another and share the same table_ref.
|
||
|
|
# These are used only as join aliases (e.g., customer_nation extends nation).
|
||
|
|
alias_view_names = _detect_alias_views(resolved)
|
||
|
|
|
||
|
|
for name, rv in resolved.items():
|
||
|
|
# Skip abstract base views
|
||
|
|
if rv.extension_required:
|
||
|
|
skipped.append(
|
||
|
|
SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="view",
|
||
|
|
reason="abstract base view (extension: required)",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Skip alias-only views (handled via join aliases)
|
||
|
|
if name in alias_view_names:
|
||
|
|
skipped.append(
|
||
|
|
SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="view",
|
||
|
|
reason=f"alias view (same table as '{alias_view_names[name]}', used as join alias)",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
|
||
|
|
# Determine source type
|
||
|
|
has_explore_source = rv.derived_table and "explore_source" in rv.derived_table
|
||
|
|
has_sql_dt = rv.derived_table and "sql" in rv.derived_table
|
||
|
|
has_table_ref = rv.sql_table_name is not None
|
||
|
|
|
||
|
|
if has_explore_source:
|
||
|
|
skipped.append(
|
||
|
|
SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="view",
|
||
|
|
reason="native derived table (explore_source not supported)",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
if not has_table_ref and not has_sql_dt:
|
||
|
|
skipped.append(
|
||
|
|
SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="view",
|
||
|
|
reason="no sql_table_name or derived_table.sql",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
continue
|
||
|
|
|
||
|
|
source_type: Literal["table", "sql"] = "sql" if has_sql_dt else "table"
|
||
|
|
table_ref = rv.sql_table_name.strip() if rv.sql_table_name else None
|
||
|
|
raw_sql = rv.derived_table["sql"].strip() if has_sql_dt else None
|
||
|
|
|
||
|
|
# Build field→column lookup for this view
|
||
|
|
field_to_col = _build_field_column_map(rv)
|
||
|
|
# For sql sources, extract columns (always parse as postgres — LookML source dialect)
|
||
|
|
# then transpile to target dialect for the output SQL
|
||
|
|
sqlglot_columns: dict[str, str] = {}
|
||
|
|
sql_text = raw_sql
|
||
|
|
if raw_sql:
|
||
|
|
sqlglot_columns = _extract_sql_columns(raw_sql, "postgres", warnings, name)
|
||
|
|
sql_text = _transpile_sql(raw_sql, "postgres", dialect, warnings, name)
|
||
|
|
|
||
|
|
# Build columns
|
||
|
|
columns: list[ParsedColumn] = []
|
||
|
|
skipped_dims: list[SkippedItem] = []
|
||
|
|
grain: list[str] = []
|
||
|
|
|
||
|
|
for dim_name, dim in rv.dimensions.items():
|
||
|
|
col = _convert_dimension(dim_name, dim, source_type, field_to_col)
|
||
|
|
if isinstance(col, SkippedItem):
|
||
|
|
skipped_dims.append(col)
|
||
|
|
elif col is not None:
|
||
|
|
columns.append(col)
|
||
|
|
if dim.get("primary_key") == "yes":
|
||
|
|
grain.append(col.name)
|
||
|
|
|
||
|
|
for dg_name, dg in rv.dimension_groups.items():
|
||
|
|
col = _convert_dimension_group(dg_name, dg)
|
||
|
|
if col is not None:
|
||
|
|
columns.append(col)
|
||
|
|
|
||
|
|
# For sql sources, fill in any columns from sqlglot that aren't already declared
|
||
|
|
if source_type == "sql" and sqlglot_columns:
|
||
|
|
existing_names = {c.name for c in columns}
|
||
|
|
for col_name, col_type in sqlglot_columns.items():
|
||
|
|
if col_name not in existing_names:
|
||
|
|
columns.append(
|
||
|
|
ParsedColumn(
|
||
|
|
name=col_name,
|
||
|
|
lookml_name=col_name,
|
||
|
|
type=col_type,
|
||
|
|
role="default",
|
||
|
|
visibility="public",
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
# Build measures
|
||
|
|
measures: list[ParsedMeasure] = []
|
||
|
|
skipped_measures: list[SkippedItem] = []
|
||
|
|
|
||
|
|
for m_name, m in rv.measures.items():
|
||
|
|
result = _convert_measure(m_name, m, field_to_col, rv)
|
||
|
|
if isinstance(result, SkippedItem):
|
||
|
|
skipped_measures.append(result)
|
||
|
|
elif result is not None:
|
||
|
|
measures.append(result)
|
||
|
|
|
||
|
|
views.append(
|
||
|
|
ParsedLookMLView(
|
||
|
|
name=name,
|
||
|
|
source_type=source_type,
|
||
|
|
table_ref=table_ref,
|
||
|
|
sql=sql_text,
|
||
|
|
grain=grain,
|
||
|
|
columns=columns,
|
||
|
|
measures=measures,
|
||
|
|
description=rv.description or rv.label,
|
||
|
|
skipped_dimensions=skipped_dims,
|
||
|
|
skipped_measures=skipped_measures,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
return views, skipped, warnings
|
||
|
|
|
||
|
|
|
||
|
|
def _build_field_column_map(rv: _RawView) -> dict[str, str]:
|
||
|
|
"""Build a map from LookML field name → actual DB column name or resolved expression.
|
||
|
|
|
||
|
|
For simple ${TABLE}.col dimensions, maps to the column name.
|
||
|
|
For computed dimensions, resolves ${TABLE}.col refs inline so measures can reference them.
|
||
|
|
"""
|
||
|
|
# First pass: collect direct column references
|
||
|
|
field_to_col: dict[str, str] = {}
|
||
|
|
for dim_name, dim in rv.dimensions.items():
|
||
|
|
sql = dim.get("sql", "")
|
||
|
|
match = TABLE_REF_RE.match(sql)
|
||
|
|
if match:
|
||
|
|
field_to_col[dim_name] = match.group(1)
|
||
|
|
|
||
|
|
for dg_name, dg in rv.dimension_groups.items():
|
||
|
|
sql = dg.get("sql", "")
|
||
|
|
match = TABLE_REF_RE.match(sql)
|
||
|
|
if match:
|
||
|
|
field_to_col[dg_name] = match.group(1)
|
||
|
|
for tf in dg.get("timeframes", []):
|
||
|
|
field_to_col[f"{dg_name}_{tf}"] = match.group(1)
|
||
|
|
|
||
|
|
# Second pass: resolve computed dimensions to their SQL with ${TABLE}.col replaced
|
||
|
|
for dim_name, dim in rv.dimensions.items():
|
||
|
|
if dim_name in field_to_col:
|
||
|
|
continue
|
||
|
|
sql = dim.get("sql", "")
|
||
|
|
if sql:
|
||
|
|
# Replace ${TABLE}.col → col
|
||
|
|
resolved = re.sub(r"\$\{TABLE\}\.(\w+)", r"\1", sql.strip())
|
||
|
|
# Replace ${field_name} with already-resolved column names
|
||
|
|
resolved = FIELD_REF_RE.sub(
|
||
|
|
lambda m: field_to_col.get(m.group(1), m.group(1)), resolved
|
||
|
|
)
|
||
|
|
field_to_col[dim_name] = resolved
|
||
|
|
|
||
|
|
return field_to_col
|
||
|
|
|
||
|
|
|
||
|
|
def _convert_dimension(
|
||
|
|
name: str,
|
||
|
|
dim: dict[str, Any],
|
||
|
|
source_type: str,
|
||
|
|
field_to_col: dict[str, str],
|
||
|
|
) -> ParsedColumn | SkippedItem | None:
|
||
|
|
"""Convert a LookML dimension to a ParsedColumn, or skip it."""
|
||
|
|
sql = dim.get("sql", "")
|
||
|
|
|
||
|
|
# Skip Liquid templating
|
||
|
|
if LIQUID_RE.search(sql):
|
||
|
|
return SkippedItem(
|
||
|
|
name=name, item_type="dimension", reason="Liquid templating not supported"
|
||
|
|
)
|
||
|
|
|
||
|
|
is_direct = TABLE_REF_RE.match(sql) is not None
|
||
|
|
|
||
|
|
if is_direct:
|
||
|
|
col_name = field_to_col.get(name, name)
|
||
|
|
expr = None
|
||
|
|
else:
|
||
|
|
# Computed dimension: use LookML dim name, store resolved expression in expr
|
||
|
|
col_name = name
|
||
|
|
expr = field_to_col.get(name)
|
||
|
|
if expr is None:
|
||
|
|
expr = re.sub(r"\$\{TABLE\}\.(\w+)", r"\1", sql.strip())
|
||
|
|
|
||
|
|
lookml_type = dim.get("type", "string")
|
||
|
|
ksl_type = LOOKML_TYPE_MAP.get(lookml_type, "string")
|
||
|
|
|
||
|
|
return ParsedColumn(
|
||
|
|
name=col_name,
|
||
|
|
lookml_name=name,
|
||
|
|
type=ksl_type,
|
||
|
|
role="default",
|
||
|
|
visibility="hidden" if dim.get("hidden") == "yes" else "public",
|
||
|
|
description=dim.get("description") or dim.get("label"),
|
||
|
|
is_computed=not is_direct,
|
||
|
|
expr=expr,
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _convert_dimension_group(
|
||
|
|
name: str,
|
||
|
|
dg: dict[str, Any],
|
||
|
|
) -> ParsedColumn | None:
|
||
|
|
"""Convert a dimension_group to a single time column."""
|
||
|
|
if dg.get("type") != "time":
|
||
|
|
return None
|
||
|
|
|
||
|
|
sql = dg.get("sql", "")
|
||
|
|
match = TABLE_REF_RE.match(sql)
|
||
|
|
col_name = match.group(1) if match else name
|
||
|
|
|
||
|
|
return ParsedColumn(
|
||
|
|
name=col_name,
|
||
|
|
lookml_name=name,
|
||
|
|
type="time",
|
||
|
|
role="time",
|
||
|
|
visibility="hidden" if dg.get("hidden") == "yes" else "public",
|
||
|
|
description=dg.get("description") or dg.get("label"),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
def _convert_measure(
|
||
|
|
name: str,
|
||
|
|
m: dict[str, Any],
|
||
|
|
field_to_col: dict[str, str],
|
||
|
|
rv: _RawView,
|
||
|
|
) -> ParsedMeasure | SkippedItem | None:
|
||
|
|
"""Convert a LookML measure to a ParsedMeasure, or skip it."""
|
||
|
|
measure_type = m.get("type", "")
|
||
|
|
sql = m.get("sql", "")
|
||
|
|
|
||
|
|
# Skip Liquid templating
|
||
|
|
if sql and LIQUID_RE.search(sql):
|
||
|
|
return SkippedItem(
|
||
|
|
name=name, item_type="measure", reason="Liquid templating not supported"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Skip cross-view references
|
||
|
|
if sql and VIEW_FIELD_REF_RE.search(sql):
|
||
|
|
return SkippedItem(
|
||
|
|
name=name, item_type="measure", reason="cross-view measure reference"
|
||
|
|
)
|
||
|
|
|
||
|
|
# Handle count (no sql needed)
|
||
|
|
if measure_type == "count" and not sql:
|
||
|
|
expr = "count(*)"
|
||
|
|
elif measure_type in MEASURE_TYPE_MAP:
|
||
|
|
if measure_type == "count":
|
||
|
|
# count with sql
|
||
|
|
expr = "count(*)"
|
||
|
|
else:
|
||
|
|
func = MEASURE_TYPE_MAP[measure_type]
|
||
|
|
# Resolve ${field_name} to column name
|
||
|
|
resolved_col = _resolve_measure_sql(sql, field_to_col)
|
||
|
|
if resolved_col is None:
|
||
|
|
return SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="measure",
|
||
|
|
reason=f"could not resolve sql reference: {sql}",
|
||
|
|
)
|
||
|
|
expr = f"{func}({resolved_col})"
|
||
|
|
elif measure_type == "date":
|
||
|
|
resolved = _resolve_measure_sql(sql, field_to_col)
|
||
|
|
if resolved is None:
|
||
|
|
return SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="measure",
|
||
|
|
reason=f"could not resolve sql reference for date measure: {sql}",
|
||
|
|
)
|
||
|
|
if _sql_contains_aggregate(resolved):
|
||
|
|
expr = resolved
|
||
|
|
else:
|
||
|
|
func = "max" if "max" in name.lower() else "min"
|
||
|
|
expr = f"{func}({resolved})"
|
||
|
|
elif measure_type == "number":
|
||
|
|
resolved = _resolve_derived_measure_sql(sql, field_to_col, rv)
|
||
|
|
if resolved is None:
|
||
|
|
return SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="measure",
|
||
|
|
reason="computed measure (type: number) with unresolvable references",
|
||
|
|
)
|
||
|
|
expr = resolved
|
||
|
|
else:
|
||
|
|
return SkippedItem(
|
||
|
|
name=name,
|
||
|
|
item_type="measure",
|
||
|
|
reason=f"unsupported measure type: {measure_type}",
|
||
|
|
)
|
||
|
|
|
||
|
|
# Handle filters
|
||
|
|
filter_str = _resolve_measure_filter(m, field_to_col, rv)
|
||
|
|
|
||
|
|
return ParsedMeasure(
|
||
|
|
name=name,
|
||
|
|
expr=expr,
|
||
|
|
filter=filter_str,
|
||
|
|
description=m.get("description") or m.get("label"),
|
||
|
|
)
|
||
|
|
|
||
|
|
|
||
|
|
SINGLE_FIELD_REF_RE = re.compile(r"^\s*\$\{(\w+)\}\s*$")
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_measure_sql(sql: str, field_to_col: dict[str, str]) -> str | None:
|
||
|
|
"""Resolve ${field_name} and ${TABLE}.col references in measure SQL to column expressions."""
|
||
|
|
if not sql:
|
||
|
|
return None
|
||
|
|
sql = sql.strip()
|
||
|
|
|
||
|
|
# Simple ${TABLE}.col → col (exact match, entire string)
|
||
|
|
table_match = TABLE_REF_RE.match(sql)
|
||
|
|
if table_match:
|
||
|
|
return table_match.group(1)
|
||
|
|
|
||
|
|
# Simple ${field_name} → resolved column (exact match, entire string)
|
||
|
|
field_match = SINGLE_FIELD_REF_RE.match(sql)
|
||
|
|
if field_match:
|
||
|
|
field_name = field_match.group(1)
|
||
|
|
return field_to_col.get(field_name, field_name)
|
||
|
|
|
||
|
|
# Complex expression with ${TABLE}.col or ${field} refs — resolve all inline
|
||
|
|
resolved = _resolve_field_refs_in_sql(sql, field_to_col)
|
||
|
|
if resolved != sql:
|
||
|
|
return resolved
|
||
|
|
|
||
|
|
return None
|
||
|
|
|
||
|
|
|
||
|
|
def _sql_contains_aggregate(sql: str) -> bool:
|
||
|
|
"""Check if an SQL expression already contains an aggregate function call."""
|
||
|
|
return bool(AGGREGATE_FUNC_RE.search(sql))
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_derived_measure_sql(
|
||
|
|
sql: str,
|
||
|
|
field_to_col: dict[str, str],
|
||
|
|
rv: _RawView,
|
||
|
|
) -> str | None:
|
||
|
|
"""Resolve a type:number measure's SQL, replacing ${field} refs with
|
||
|
|
measure names (for measure refs) or column expressions (for dimension refs).
|
||
|
|
|
||
|
|
Returns the resolved expression, or None if any reference is unresolvable.
|
||
|
|
"""
|
||
|
|
if not sql:
|
||
|
|
return None
|
||
|
|
|
||
|
|
if VIEW_FIELD_REF_RE.search(sql):
|
||
|
|
return None
|
||
|
|
|
||
|
|
def _replace_ref(match: re.Match[str]) -> str:
|
||
|
|
field_name = match.group(1)
|
||
|
|
if field_name == "TABLE":
|
||
|
|
return match.group(0)
|
||
|
|
if field_name in rv.measures:
|
||
|
|
return field_name
|
||
|
|
if field_name in field_to_col:
|
||
|
|
return field_to_col[field_name]
|
||
|
|
return match.group(0)
|
||
|
|
|
||
|
|
resolved = re.sub(r"\$\{TABLE\}\.(\w+)", r"\1", sql.strip())
|
||
|
|
resolved = FIELD_REF_RE.sub(_replace_ref, resolved)
|
||
|
|
|
||
|
|
if FIELD_REF_RE.search(resolved):
|
||
|
|
return None
|
||
|
|
|
||
|
|
return resolved
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_measure_filter(
|
||
|
|
m: dict[str, Any],
|
||
|
|
field_to_col: dict[str, str],
|
||
|
|
rv: _RawView,
|
||
|
|
) -> str | None:
|
||
|
|
"""Convert LookML measure filters to a KSL filter string."""
|
||
|
|
filters_all = m.get("filters__all")
|
||
|
|
if not filters_all:
|
||
|
|
return None
|
||
|
|
|
||
|
|
filter_parts: list[str] = []
|
||
|
|
for group in _iter_filter_groups(filters_all):
|
||
|
|
for field_name, value in group.items():
|
||
|
|
# Try to resolve the filter field to its underlying SQL
|
||
|
|
dim = rv.dimensions.get(field_name)
|
||
|
|
if dim and dim.get("type") == "yesno":
|
||
|
|
# yesno: the sql IS the boolean expression — resolve ${field} refs in it
|
||
|
|
dim_sql = _resolve_field_refs_in_sql(
|
||
|
|
dim.get("sql", "").strip(), field_to_col
|
||
|
|
)
|
||
|
|
if value == "yes":
|
||
|
|
filter_parts.append(dim_sql)
|
||
|
|
else:
|
||
|
|
filter_parts.append(f"NOT ({dim_sql})")
|
||
|
|
else:
|
||
|
|
col = field_to_col.get(field_name, field_name)
|
||
|
|
filter_parts.append(f"{col} = '{value}'")
|
||
|
|
|
||
|
|
return " AND ".join(filter_parts) if filter_parts else None
|
||
|
|
|
||
|
|
|
||
|
|
def _resolve_field_refs_in_sql(sql: str, field_to_col: dict[str, str]) -> str:
|
||
|
|
"""Replace ${field_name} and ${TABLE}.col references in SQL with actual column names."""
|
||
|
|
# First replace ${TABLE}.col
|
||
|
|
sql = re.sub(r"\$\{TABLE\}\.(\w+)", r"\1", sql)
|
||
|
|
# Then replace ${field_name}
|
||
|
|
sql = FIELD_REF_RE.sub(lambda m: field_to_col.get(m.group(1), m.group(1)), sql)
|
||
|
|
return sql
|
||
|
|
|
||
|
|
|
||
|
|
def _iter_filter_groups(filters_all: Any) -> list[dict[str, str]]:
|
||
|
|
"""Iterate through lkml's filters__all nested structure."""
|
||
|
|
result: list[dict[str, str]] = []
|
||
|
|
if isinstance(filters_all, list):
|
||
|
|
for item in filters_all:
|
||
|
|
if isinstance(item, dict):
|
||
|
|
result.append(item)
|
||
|
|
elif isinstance(item, list):
|
||
|
|
for sub in item:
|
||
|
|
if isinstance(sub, dict):
|
||
|
|
result.append(sub)
|
||
|
|
return result
|
||
|
|
|
||
|
|
|
||
|
|
# ── sqlglot: transpile and extract columns from derived table SQL ─────
|
||
|
|
|
||
|
|
|
||
|
|
def _transpile_sql(
|
||
|
|
sql: str,
|
||
|
|
source_dialect: str,
|
||
|
|
target_dialect: str,
|
||
|
|
warnings: list[str],
|
||
|
|
view_name: str,
|
||
|
|
) -> str:
|
||
|
|
"""Transpile SQL from source dialect to target dialect using sqlglot."""
|
||
|
|
if source_dialect == target_dialect:
|
||
|
|
return sql
|
||
|
|
try:
|
||
|
|
results = sqlglot.transpile(sql, read=source_dialect, write=target_dialect)
|
||
|
|
return results[0] if results else sql
|
||
|
|
except Exception as e:
|
||
|
|
warnings.append(
|
||
|
|
f"sqlglot transpile failed for view '{view_name}' "
|
||
|
|
f"({source_dialect} → {target_dialect}): {e}"
|
||
|
|
)
|
||
|
|
return sql
|
||
|
|
|
||
|
|
|
||
|
|
def _extract_sql_columns(
|
||
|
|
sql: str, dialect: str, warnings: list[str], view_name: str
|
||
|
|
) -> dict[str, str]:
|
||
|
|
"""Extract output column names and inferred types from a SELECT statement."""
|
||
|
|
try:
|
||
|
|
tree = sqlglot.parse_one(sql, read=dialect)
|
||
|
|
except Exception as e:
|
||
|
|
warnings.append(f"sqlglot parse failed for view '{view_name}': {e}")
|
||
|
|
return {}
|
||
|
|
|
||
|
|
columns: dict[str, str] = {}
|
||
|
|
for expr_node in tree.expressions:
|
||
|
|
alias = expr_node.alias
|
||
|
|
if not alias and isinstance(expr_node, exp.Column):
|
||
|
|
alias = expr_node.name
|
||
|
|
if alias:
|
||
|
|
col_type = _infer_column_type(expr_node)
|
||
|
|
columns[alias.lower()] = col_type
|
||
|
|
|
||
|
|
return columns
|
||
|
|
|
||
|
|
|
||
|
|
def _infer_column_type(node: exp.Expression) -> str:
|
||
|
|
"""Infer KSL column type from a sqlglot AST node."""
|
||
|
|
# Check if it's an aggregate
|
||
|
|
inner = node.unalias() if hasattr(node, "unalias") else node
|
||
|
|
|
||
|
|
if isinstance(inner, (exp.Count, exp.Sum, exp.Avg, exp.Min, exp.Max)):
|
||
|
|
return "number"
|
||
|
|
if isinstance(inner, exp.Anonymous):
|
||
|
|
func_name = inner.name.upper() if hasattr(inner, "name") else ""
|
||
|
|
if func_name in ("COUNT", "SUM", "AVG", "MIN", "MAX", "COUNT_DISTINCT"):
|
||
|
|
return "number"
|
||
|
|
|
||
|
|
# Check for CASE expressions (usually string or number)
|
||
|
|
if isinstance(inner, exp.Case):
|
||
|
|
return "string"
|
||
|
|
|
||
|
|
# Check for date functions
|
||
|
|
if isinstance(
|
||
|
|
inner,
|
||
|
|
(
|
||
|
|
exp.DateTrunc,
|
||
|
|
exp.DateAdd,
|
||
|
|
exp.DateSub,
|
||
|
|
exp.CurrentDate,
|
||
|
|
exp.CurrentTimestamp,
|
||
|
|
),
|
||
|
|
):
|
||
|
|
return "time"
|
||
|
|
|
||
|
|
# Default: assume number for aggregates, string otherwise
|
||
|
|
if inner.find(exp.AggFunc):
|
||
|
|
return "number"
|
||
|
|
|
||
|
|
return "string"
|
||
|
|
|
||
|
|
|
||
|
|
# ── Joins ──────────────────────────────────────────────────────────────
|
||
|
|
|
||
|
|
|
||
|
|
def _build_parsed_joins(
|
||
|
|
raw_explores: list[dict[str, Any]],
|
||
|
|
resolved_views: dict[str, _RawView],
|
||
|
|
) -> list[ParsedJoin]:
|
||
|
|
"""Convert explore join definitions to ParsedJoin list."""
|
||
|
|
joins: list[ParsedJoin] = []
|
||
|
|
seen: set[tuple[str, str, str | None]] = set() # (source, target, alias)
|
||
|
|
|
||
|
|
for explore in raw_explores:
|
||
|
|
explore_base = explore.get("view_name") or explore.get("name", "")
|
||
|
|
|
||
|
|
for join_data in explore.get("joins", []):
|
||
|
|
join_name = join_data.get("name", "")
|
||
|
|
from_view = join_data.get("from")
|
||
|
|
target_view = from_view or join_name
|
||
|
|
alias = join_name if from_view else None
|
||
|
|
relationship = join_data.get("relationship", "many_to_one")
|
||
|
|
sql_on = join_data.get("sql_on", "")
|
||
|
|
|
||
|
|
# Parse sql_on to extract source view and resolved condition
|
||
|
|
source_view, on_clause = _parse_sql_on(
|
||
|
|
sql_on, explore_base, join_name, resolved_views
|
||
|
|
)
|
||
|
|
|
||
|
|
key = (source_view, target_view, alias)
|
||
|
|
if key in seen:
|
||
|
|
continue
|
||
|
|
seen.add(key)
|
||
|
|
|
||
|
|
joins.append(
|
||
|
|
ParsedJoin(
|
||
|
|
source_view=source_view,
|
||
|
|
to=target_view,
|
||
|
|
alias=alias,
|
||
|
|
on=on_clause,
|
||
|
|
relationship=relationship,
|
||
|
|
)
|
||
|
|
)
|
||
|
|
|
||
|
|
return joins
|
||
|
|
|
||
|
|
|
||
|
|
def _parse_sql_on(
|
||
|
|
sql_on: str,
|
||
|
|
explore_base: str,
|
||
|
|
join_name: str,
|
||
|
|
resolved_views: dict[str, _RawView],
|
||
|
|
) -> tuple[str, str]:
|
||
|
|
"""Parse a sql_on expression to extract the source view and KSL-format on clause.
|
||
|
|
|
||
|
|
Returns (source_view, on_clause).
|
||
|
|
"""
|
||
|
|
refs = VIEW_FIELD_REF_RE.findall(sql_on)
|
||
|
|
if not refs:
|
||
|
|
return explore_base, sql_on.strip()
|
||
|
|
|
||
|
|
# Resolve each ${view.field} to view.actual_column
|
||
|
|
def resolve_ref(match: re.Match[str]) -> str:
|
||
|
|
view_name = match.group(1)
|
||
|
|
field_name = match.group(2)
|
||
|
|
rv = resolved_views.get(view_name)
|
||
|
|
if rv:
|
||
|
|
# Look up actual column name
|
||
|
|
dim = rv.dimensions.get(field_name)
|
||
|
|
if dim:
|
||
|
|
table_match = TABLE_REF_RE.match(dim.get("sql", ""))
|
||
|
|
if table_match:
|
||
|
|
return f"{view_name}.{table_match.group(1)}"
|
||
|
|
# Check dimension_groups
|
||
|
|
for dg_name, dg in rv.dimension_groups.items():
|
||
|
|
if field_name == dg_name or field_name.startswith(f"{dg_name}_"):
|
||
|
|
dg_match = TABLE_REF_RE.match(dg.get("sql", ""))
|
||
|
|
if dg_match:
|
||
|
|
return f"{view_name}.{dg_match.group(1)}"
|
||
|
|
return f"{view_name}.{field_name}"
|
||
|
|
|
||
|
|
on_clause = VIEW_FIELD_REF_RE.sub(resolve_ref, sql_on).strip()
|
||
|
|
|
||
|
|
# Determine source view: first referenced view that isn't the join target
|
||
|
|
source_view = explore_base
|
||
|
|
for view_name, _ in refs:
|
||
|
|
if view_name != join_name:
|
||
|
|
source_view = view_name
|
||
|
|
break
|
||
|
|
return source_view, on_clause
|