mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-13 08:15:14 +02:00
feat: add daemon telemetry foundation
This commit is contained in:
parent
2cbafa3df6
commit
f72c7c8bdc
9 changed files with 3222 additions and 0 deletions
5
python/ktx-daemon/src/ktx_daemon/telemetry/__init__.py
Normal file
5
python/ktx-daemon/src/ktx_daemon/telemetry/__init__.py
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
from __future__ import annotations
|
||||
|
||||
from ktx_daemon.telemetry.emitter import error_class, track_telemetry_event
|
||||
|
||||
__all__ = ["error_class", "track_telemetry_event"]
|
||||
105
python/ktx-daemon/src/ktx_daemon/telemetry/emitter.py
Normal file
105
python/ktx-daemon/src/ktx_daemon/telemetry/emitter.py
Normal file
|
|
@ -0,0 +1,105 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
from collections.abc import Mapping
|
||||
|
||||
from ktx_daemon.telemetry.events import build_telemetry_event
|
||||
from ktx_daemon.telemetry.identity import load_telemetry_identity
|
||||
|
||||
POSTHOG_PROJECT_API_KEY = ""
|
||||
POSTHOG_HOST = ""
|
||||
|
||||
|
||||
def _host(env: Mapping[str, str]) -> str:
|
||||
return env.get("KTX_TELEMETRY_ENDPOINT") or POSTHOG_HOST
|
||||
|
||||
|
||||
def _live_configured(host: str) -> bool:
|
||||
return bool(POSTHOG_PROJECT_API_KEY.strip() and host.strip())
|
||||
|
||||
|
||||
def _debug_enabled(env: Mapping[str, str]) -> bool:
|
||||
return env.get("KTX_TELEMETRY_DEBUG") == "1"
|
||||
|
||||
|
||||
def _scrub_error_class(error: BaseException) -> str | None:
|
||||
name = type(error).__name__
|
||||
if len(name) > 80:
|
||||
return None
|
||||
if any(marker in name for marker in ("/", "\\", "@", "://")):
|
||||
return None
|
||||
if not name[:1].isupper() or not name.replace("_", "").isalnum():
|
||||
return None
|
||||
return name
|
||||
|
||||
|
||||
def error_class(error: BaseException) -> str | None:
|
||||
return _scrub_error_class(error)
|
||||
|
||||
|
||||
def track_telemetry_event(
|
||||
name: str,
|
||||
fields: dict[str, Any],
|
||||
*,
|
||||
project_id: str | None = None,
|
||||
home_dir: Path | None = None,
|
||||
env: Mapping[str, str] | None = None,
|
||||
) -> None:
|
||||
source_env = env or os.environ
|
||||
identity = load_telemetry_identity(home_dir=home_dir, env=source_env)
|
||||
if not identity.enabled or not identity.install_id:
|
||||
return
|
||||
|
||||
try:
|
||||
event = build_telemetry_event(name, fields)
|
||||
except ValueError:
|
||||
return
|
||||
|
||||
groups = {"project": project_id} if project_id else None
|
||||
|
||||
if _debug_enabled(source_env):
|
||||
sys.stderr.write(
|
||||
"[telemetry] "
|
||||
+ json.dumps(
|
||||
{
|
||||
"distinctId": identity.install_id,
|
||||
"event": event["event"],
|
||||
"properties": event["properties"],
|
||||
"groups": groups,
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
+ "\n"
|
||||
)
|
||||
return
|
||||
|
||||
host = _host(source_env)
|
||||
if not _live_configured(host):
|
||||
return
|
||||
|
||||
try:
|
||||
from posthog import Posthog
|
||||
|
||||
client = Posthog(
|
||||
POSTHOG_PROJECT_API_KEY,
|
||||
host=host,
|
||||
flush_at=1,
|
||||
flush_interval=0,
|
||||
sync_mode=True,
|
||||
timeout=1,
|
||||
disable_geoip=True,
|
||||
)
|
||||
client.capture(
|
||||
event=event["event"],
|
||||
distinct_id=identity.install_id,
|
||||
properties=event["properties"],
|
||||
groups=groups,
|
||||
disable_geoip=True,
|
||||
)
|
||||
client.shutdown()
|
||||
except Exception:
|
||||
return
|
||||
72
python/ktx-daemon/src/ktx_daemon/telemetry/events.py
Normal file
72
python/ktx-daemon/src/ktx_daemon/telemetry/events.py
Normal file
|
|
@ -0,0 +1,72 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import platform
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
from ktx_daemon import VERSION
|
||||
|
||||
SCHEMA_PATH = Path(__file__).with_name("events.schema.json")
|
||||
COMMON_FIELDS = {
|
||||
"cliVersion",
|
||||
"nodeVersion",
|
||||
"osPlatform",
|
||||
"osRelease",
|
||||
"arch",
|
||||
"runtime",
|
||||
"isCi",
|
||||
}
|
||||
DAEMON_EVENTS = {
|
||||
"daemon_started",
|
||||
"daemon_stopped",
|
||||
"sl_plan_completed",
|
||||
"sql_gen_completed",
|
||||
}
|
||||
|
||||
|
||||
def _schema_catalog() -> dict[str, set[str]]:
|
||||
raw = json.loads(SCHEMA_PATH.read_text(encoding="utf-8"))
|
||||
return {
|
||||
event["name"]: set(event["fields"])
|
||||
for event in raw["x-ktx-catalog"]
|
||||
if event["name"] in DAEMON_EVENTS
|
||||
}
|
||||
|
||||
|
||||
EVENT_FIELDS = _schema_catalog()
|
||||
|
||||
|
||||
def _common_envelope() -> dict[str, Any]:
|
||||
return {
|
||||
"cliVersion": os.environ.get("KTX_DAEMON_VERSION", VERSION),
|
||||
"nodeVersion": f"{sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}",
|
||||
"osPlatform": sys.platform,
|
||||
"osRelease": platform.release(),
|
||||
"arch": platform.machine(),
|
||||
"runtime": "daemon-py",
|
||||
"isCi": bool(os.environ.get("CI")),
|
||||
}
|
||||
|
||||
|
||||
def build_telemetry_event(name: str, fields: dict[str, Any]) -> dict[str, Any]:
|
||||
allowed = EVENT_FIELDS.get(name)
|
||||
if allowed is None:
|
||||
raise ValueError(f"unknown telemetry event: {name}")
|
||||
|
||||
extra = set(fields) - allowed
|
||||
if extra:
|
||||
raise ValueError(f"unknown telemetry fields for {name}: {sorted(extra)}")
|
||||
|
||||
missing = {
|
||||
field for field in allowed if field not in fields and field != "errorClass"
|
||||
}
|
||||
if missing:
|
||||
raise ValueError(f"missing telemetry fields for {name}: {sorted(missing)}")
|
||||
|
||||
return {
|
||||
"event": name,
|
||||
"properties": {**_common_envelope(), **fields},
|
||||
}
|
||||
1407
python/ktx-daemon/src/ktx_daemon/telemetry/events.schema.json
Normal file
1407
python/ktx-daemon/src/ktx_daemon/telemetry/events.schema.json
Normal file
File diff suppressed because it is too large
Load diff
77
python/ktx-daemon/src/ktx_daemon/telemetry/identity.py
Normal file
77
python/ktx-daemon/src/ktx_daemon/telemetry/identity.py
Normal file
|
|
@ -0,0 +1,77 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from collections.abc import Mapping
|
||||
|
||||
IDENTITY_TTL_SECONDS = 60.0
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TelemetryIdentity:
|
||||
install_id: str | None
|
||||
enabled: bool
|
||||
path: Path
|
||||
|
||||
|
||||
_cache: tuple[float, Path, TelemetryIdentity] | None = None
|
||||
|
||||
|
||||
def _telemetry_path(home_dir: Path | None = None) -> Path:
|
||||
return (home_dir or Path.home()) / ".ktx" / "telemetry.json"
|
||||
|
||||
|
||||
def _env_disables(env: Mapping[str, str] | None = None) -> bool:
|
||||
source = env or os.environ
|
||||
return bool(source.get("KTX_TELEMETRY_DISABLED") or source.get("DO_NOT_TRACK"))
|
||||
|
||||
|
||||
def _read_identity(path: Path) -> TelemetryIdentity:
|
||||
try:
|
||||
raw = json.loads(path.read_text(encoding="utf-8"))
|
||||
except (OSError, json.JSONDecodeError):
|
||||
return TelemetryIdentity(install_id=None, enabled=False, path=path)
|
||||
|
||||
install_id = raw.get("installId")
|
||||
enabled = raw.get("enabled")
|
||||
if not isinstance(install_id, str) or enabled is not True:
|
||||
return TelemetryIdentity(
|
||||
install_id=install_id if isinstance(install_id, str) else None,
|
||||
enabled=False,
|
||||
path=path,
|
||||
)
|
||||
|
||||
return TelemetryIdentity(install_id=install_id, enabled=True, path=path)
|
||||
|
||||
|
||||
def load_telemetry_identity(
|
||||
*,
|
||||
home_dir: Path | None = None,
|
||||
env: Mapping[str, str] | None = None,
|
||||
now: Callable[[], float] | None = None,
|
||||
) -> TelemetryIdentity:
|
||||
global _cache
|
||||
|
||||
path = _telemetry_path(home_dir)
|
||||
clock = now or time.monotonic
|
||||
current = float(clock())
|
||||
|
||||
if _cache and _cache[1] == path and current - _cache[0] < IDENTITY_TTL_SECONDS:
|
||||
cached = _cache[2]
|
||||
else:
|
||||
cached = _read_identity(path)
|
||||
_cache = (current, path, cached)
|
||||
|
||||
if _env_disables(env):
|
||||
return TelemetryIdentity(install_id=cached.install_id, enabled=False, path=path)
|
||||
|
||||
return cached
|
||||
|
||||
|
||||
def reset_identity_cache() -> None:
|
||||
global _cache
|
||||
_cache = None
|
||||
Loading…
Add table
Add a link
Reference in a new issue