mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-10 08:05:14 +02:00
feat: emit semantic daemon telemetry
This commit is contained in:
parent
f72c7c8bdc
commit
72738958c3
2 changed files with 119 additions and 10 deletions
|
|
@ -2,18 +2,23 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from ktx_daemon.telemetry import error_class, track_telemetry_event
|
||||
from pydantic import BaseModel, ConfigDict, Field
|
||||
from semantic_layer.duplicate_check import validate_measure_duplicates
|
||||
from semantic_layer.engine import SemanticEngine
|
||||
from semantic_layer.models import QueryResult, SourceDefinition
|
||||
|
||||
|
||||
class SemanticLayerQueryRequest(BaseModel):
|
||||
model_config = ConfigDict(populate_by_name=True)
|
||||
|
||||
sources: list[dict[str, Any]]
|
||||
query: dict[str, Any]
|
||||
dialect: str = "postgres"
|
||||
project_id: str | None = Field(default=None, alias="projectId")
|
||||
|
||||
|
||||
class SemanticLayerQueryResponse(BaseModel):
|
||||
|
|
@ -79,15 +84,73 @@ def _response_columns(result: QueryResult) -> list[dict[str, Any]]:
|
|||
def query_semantic_layer(
|
||||
request: SemanticLayerQueryRequest,
|
||||
) -> SemanticLayerQueryResponse:
|
||||
sources = _load_sources(request.sources)
|
||||
engine = SemanticEngine.from_sources(sources, dialect=request.dialect)
|
||||
result = engine.query(request.query)
|
||||
return SemanticLayerQueryResponse(
|
||||
sql=result.sql,
|
||||
dialect=result.dialect,
|
||||
columns=_response_columns(result),
|
||||
plan=result.resolved_plan.model_dump(mode="json"),
|
||||
)
|
||||
started = time.perf_counter()
|
||||
stage = "parse"
|
||||
source_count = 0
|
||||
join_count = 0
|
||||
sql_started = started
|
||||
try:
|
||||
sources = _load_sources(request.sources)
|
||||
source_count = len(sources)
|
||||
join_count = sum(len(source.joins) for source in sources.values())
|
||||
stage = "resolve"
|
||||
engine = SemanticEngine.from_sources(sources, dialect=request.dialect)
|
||||
stage = "compile"
|
||||
sql_started = time.perf_counter()
|
||||
result = engine.query(request.query)
|
||||
stage = "transpile"
|
||||
track_telemetry_event(
|
||||
"sl_plan_completed",
|
||||
{
|
||||
"outcome": "ok",
|
||||
"stage": stage,
|
||||
"durationMs": max(0, (time.perf_counter() - started) * 1000),
|
||||
"sourceCount": source_count,
|
||||
"joinCount": join_count,
|
||||
},
|
||||
project_id=request.project_id,
|
||||
)
|
||||
track_telemetry_event(
|
||||
"sql_gen_completed",
|
||||
{
|
||||
"outcome": "ok",
|
||||
"dialect": result.dialect,
|
||||
"durationMs": max(0, (time.perf_counter() - sql_started) * 1000),
|
||||
},
|
||||
project_id=request.project_id,
|
||||
)
|
||||
return SemanticLayerQueryResponse(
|
||||
sql=result.sql,
|
||||
dialect=result.dialect,
|
||||
columns=_response_columns(result),
|
||||
plan=result.resolved_plan.model_dump(mode="json"),
|
||||
)
|
||||
except Exception as error:
|
||||
klass = error_class(error)
|
||||
fields: dict[str, Any] = {
|
||||
"outcome": "error",
|
||||
"stage": stage,
|
||||
"durationMs": max(0, (time.perf_counter() - started) * 1000),
|
||||
"sourceCount": source_count,
|
||||
"joinCount": join_count,
|
||||
}
|
||||
if klass:
|
||||
fields["errorClass"] = klass
|
||||
track_telemetry_event(
|
||||
"sl_plan_completed", fields, project_id=request.project_id
|
||||
)
|
||||
if stage in {"compile", "transpile"}:
|
||||
sql_fields: dict[str, Any] = {
|
||||
"outcome": "error",
|
||||
"dialect": request.dialect,
|
||||
"durationMs": max(0, (time.perf_counter() - sql_started) * 1000),
|
||||
}
|
||||
if klass:
|
||||
sql_fields["errorClass"] = klass
|
||||
track_telemetry_event(
|
||||
"sql_gen_completed", sql_fields, project_id=request.project_id
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
def validate_semantic_layer(request: ValidateSourcesRequest) -> ValidateSourcesResponse:
|
||||
|
|
|
|||
|
|
@ -1,5 +1,8 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from pathlib import Path
|
||||
|
||||
from ktx_daemon.semantic_layer import (
|
||||
SemanticLayerQueryRequest,
|
||||
ValidateSourcesRequest,
|
||||
|
|
@ -46,6 +49,49 @@ def test_query_semantic_layer_generates_sql_and_plan() -> None:
|
|||
assert response.plan["sources_used"] == ["orders"]
|
||||
|
||||
|
||||
def test_query_semantic_layer_emits_plan_and_sql_debug_events(
|
||||
tmp_path: Path,
|
||||
monkeypatch,
|
||||
capsys,
|
||||
) -> None:
|
||||
from ktx_daemon.telemetry.identity import reset_identity_cache
|
||||
|
||||
reset_identity_cache()
|
||||
identity_path = tmp_path / ".ktx" / "telemetry.json"
|
||||
identity_path.parent.mkdir(parents=True)
|
||||
identity_path.write_text(
|
||||
json.dumps(
|
||||
{
|
||||
"installId": "00000000-0000-4000-8000-000000000000",
|
||||
"enabled": True,
|
||||
"createdAt": "2026-05-22T14:33:02.000Z",
|
||||
}
|
||||
)
|
||||
+ "\n",
|
||||
encoding="utf-8",
|
||||
)
|
||||
monkeypatch.setenv("HOME", str(tmp_path))
|
||||
monkeypatch.setenv("KTX_TELEMETRY_DEBUG", "1")
|
||||
|
||||
query_semantic_layer(
|
||||
SemanticLayerQueryRequest(
|
||||
sources=[ORDERS_SOURCE],
|
||||
dialect="postgres",
|
||||
projectId="a" * 64,
|
||||
query={
|
||||
"measures": ["orders.order_count"],
|
||||
"dimensions": ["orders.status"],
|
||||
"limit": 25,
|
||||
},
|
||||
)
|
||||
)
|
||||
|
||||
captured = capsys.readouterr()
|
||||
assert '"event": "sl_plan_completed"' in captured.err
|
||||
assert '"event": "sql_gen_completed"' in captured.err
|
||||
assert "public.orders" not in captured.err
|
||||
|
||||
|
||||
def test_validate_semantic_layer_reports_duplicate_measure_names() -> None:
|
||||
invalid_source = {
|
||||
**ORDERS_SOURCE,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue