diff --git a/python/ktx-daemon/src/ktx_daemon/semantic_layer.py b/python/ktx-daemon/src/ktx_daemon/semantic_layer.py index d6c15cd0..e813575e 100644 --- a/python/ktx-daemon/src/ktx_daemon/semantic_layer.py +++ b/python/ktx-daemon/src/ktx_daemon/semantic_layer.py @@ -2,18 +2,23 @@ from __future__ import annotations +import time from typing import Any -from pydantic import BaseModel, Field +from ktx_daemon.telemetry import error_class, track_telemetry_event +from pydantic import BaseModel, ConfigDict, Field from semantic_layer.duplicate_check import validate_measure_duplicates from semantic_layer.engine import SemanticEngine from semantic_layer.models import QueryResult, SourceDefinition class SemanticLayerQueryRequest(BaseModel): + model_config = ConfigDict(populate_by_name=True) + sources: list[dict[str, Any]] query: dict[str, Any] dialect: str = "postgres" + project_id: str | None = Field(default=None, alias="projectId") class SemanticLayerQueryResponse(BaseModel): @@ -79,15 +84,73 @@ def _response_columns(result: QueryResult) -> list[dict[str, Any]]: def query_semantic_layer( request: SemanticLayerQueryRequest, ) -> SemanticLayerQueryResponse: - sources = _load_sources(request.sources) - engine = SemanticEngine.from_sources(sources, dialect=request.dialect) - result = engine.query(request.query) - return SemanticLayerQueryResponse( - sql=result.sql, - dialect=result.dialect, - columns=_response_columns(result), - plan=result.resolved_plan.model_dump(mode="json"), - ) + started = time.perf_counter() + stage = "parse" + source_count = 0 + join_count = 0 + sql_started = started + try: + sources = _load_sources(request.sources) + source_count = len(sources) + join_count = sum(len(source.joins) for source in sources.values()) + stage = "resolve" + engine = SemanticEngine.from_sources(sources, dialect=request.dialect) + stage = "compile" + sql_started = time.perf_counter() + result = engine.query(request.query) + stage = "transpile" + track_telemetry_event( + "sl_plan_completed", + { + "outcome": "ok", + "stage": stage, + "durationMs": max(0, (time.perf_counter() - started) * 1000), + "sourceCount": source_count, + "joinCount": join_count, + }, + project_id=request.project_id, + ) + track_telemetry_event( + "sql_gen_completed", + { + "outcome": "ok", + "dialect": result.dialect, + "durationMs": max(0, (time.perf_counter() - sql_started) * 1000), + }, + project_id=request.project_id, + ) + return SemanticLayerQueryResponse( + sql=result.sql, + dialect=result.dialect, + columns=_response_columns(result), + plan=result.resolved_plan.model_dump(mode="json"), + ) + except Exception as error: + klass = error_class(error) + fields: dict[str, Any] = { + "outcome": "error", + "stage": stage, + "durationMs": max(0, (time.perf_counter() - started) * 1000), + "sourceCount": source_count, + "joinCount": join_count, + } + if klass: + fields["errorClass"] = klass + track_telemetry_event( + "sl_plan_completed", fields, project_id=request.project_id + ) + if stage in {"compile", "transpile"}: + sql_fields: dict[str, Any] = { + "outcome": "error", + "dialect": request.dialect, + "durationMs": max(0, (time.perf_counter() - sql_started) * 1000), + } + if klass: + sql_fields["errorClass"] = klass + track_telemetry_event( + "sql_gen_completed", sql_fields, project_id=request.project_id + ) + raise def validate_semantic_layer(request: ValidateSourcesRequest) -> ValidateSourcesResponse: diff --git a/python/ktx-daemon/tests/test_semantic_layer.py b/python/ktx-daemon/tests/test_semantic_layer.py index 7a75c019..9a3f0f6f 100644 --- a/python/ktx-daemon/tests/test_semantic_layer.py +++ b/python/ktx-daemon/tests/test_semantic_layer.py @@ -1,5 +1,8 @@ from __future__ import annotations +import json +from pathlib import Path + from ktx_daemon.semantic_layer import ( SemanticLayerQueryRequest, ValidateSourcesRequest, @@ -46,6 +49,49 @@ def test_query_semantic_layer_generates_sql_and_plan() -> None: assert response.plan["sources_used"] == ["orders"] +def test_query_semantic_layer_emits_plan_and_sql_debug_events( + tmp_path: Path, + monkeypatch, + capsys, +) -> None: + from ktx_daemon.telemetry.identity import reset_identity_cache + + reset_identity_cache() + identity_path = tmp_path / ".ktx" / "telemetry.json" + identity_path.parent.mkdir(parents=True) + identity_path.write_text( + json.dumps( + { + "installId": "00000000-0000-4000-8000-000000000000", + "enabled": True, + "createdAt": "2026-05-22T14:33:02.000Z", + } + ) + + "\n", + encoding="utf-8", + ) + monkeypatch.setenv("HOME", str(tmp_path)) + monkeypatch.setenv("KTX_TELEMETRY_DEBUG", "1") + + query_semantic_layer( + SemanticLayerQueryRequest( + sources=[ORDERS_SOURCE], + dialect="postgres", + projectId="a" * 64, + query={ + "measures": ["orders.order_count"], + "dimensions": ["orders.status"], + "limit": 25, + }, + ) + ) + + captured = capsys.readouterr() + assert '"event": "sl_plan_completed"' in captured.err + assert '"event": "sql_gen_completed"' in captured.err + assert "public.orders" not in captured.err + + def test_validate_semantic_layer_reports_duplicate_measure_names() -> None: invalid_source = { **ORDERS_SOURCE,