rename klo to ktx

This commit is contained in:
Andrey Avtomonov 2026-05-10 23:51:24 +02:00
parent 1a42152e6f
commit 3ce510b55b
704 changed files with 10205 additions and 10255 deletions

View file

@ -0,0 +1,219 @@
#!/usr/bin/env python3
"""Generate semantic layer YAML sources from demo DB metadata.
Usage:
kubectl port-forward -n ktx-demo deployment/ktx-demo-db 5433:5432 &
KTX_DEMO_DB_PASSWORD=local-demo-password python scripts/gen_b2b_saas_model.py
"""
import os
import psycopg2
import yaml
CONNECTION_ID = "256bc76b-cc47-4d5d-a9fc-5bcfb0364d44"
OUTPUT_DIR = os.path.join(os.path.dirname(__file__), "..", "sources", "b2b_saas")
DB_PARAMS = {
"host": os.environ.get("KTX_DEMO_DB_HOST", "127.0.0.1"),
"port": int(os.environ.get("KTX_DEMO_DB_PORT", "5433")),
"user": os.environ.get("KTX_DEMO_DB_USER", "ktx-demo-user"),
"password": os.environ.get("KTX_DEMO_DB_PASSWORD", ""),
"dbname": os.environ.get("KTX_DEMO_DB_NAME", "ktx-demo-db"),
}
# Map DB types to semantic layer types
TYPE_MAP = {
"INTEGER": "number",
"FLOAT": "number",
"NUMERIC": "number",
"DECIMAL": "number",
"BIGINT": "number",
"SMALLINT": "number",
"DOUBLE": "number",
"REAL": "number",
"VARCHAR": "string",
"TEXT": "string",
"CHAR": "string",
"DATE": "time",
"TIMESTAMP": "time",
"TIMESTAMPTZ": "time",
"DATETIME": "time",
"TIME": "time",
"BOOLEAN": "boolean",
"BOOL": "boolean",
}
# Columns whose names suggest a time role
TIME_PATTERNS = {"_at", "_date", "date", "timestamp", "created", "updated"}
def is_time_column(name: str, db_type: str) -> bool:
sl_type = TYPE_MAP.get(db_type.upper(), "string")
if sl_type == "time":
return True
# VARCHAR columns with date-like names (e.g. created_at stored as VARCHAR)
lower = name.lower()
return any(p in lower for p in TIME_PATTERNS) and sl_type == "string"
def map_type(db_type: str, col_name: str) -> str:
upper = db_type.upper()
if upper in TYPE_MAP:
base = TYPE_MAP[upper]
# Override string→time for date-like column names
if base == "string" and is_time_column(col_name, db_type):
return "time"
return base
return "string"
def main():
conn = psycopg2.connect(**DB_PARAMS)
cur = conn.cursor()
# 1. Fetch tables
cur.execute(
"SELECT id, name FROM source_tables WHERE connection_id = %s ORDER BY name",
(CONNECTION_ID,),
)
tables = {row[0]: row[1] for row in cur.fetchall()}
table_ids = tuple(tables.keys())
# 2. Fetch columns
cur.execute(
"""
SELECT id, name, type, nullable, primary_key, table_id
FROM source_columns
WHERE table_id = ANY(%s::uuid[])
ORDER BY table_id, primary_key DESC, name
""",
(list(table_ids),),
)
columns_by_table: dict[str, list] = {}
col_id_to_info: dict[str, dict] = {}
for row in cur.fetchall():
col_id, col_name, col_type, nullable, is_pk, table_id = row
info = {
"id": col_id,
"name": col_name,
"type": col_type,
"nullable": nullable,
"primary_key": is_pk,
"table_id": table_id,
}
col_id_to_info[col_id] = info
columns_by_table.setdefault(table_id, []).append(info)
# 3. Fetch links (joins)
cur.execute(
"""
SELECT from_table_id, from_column_id, to_table_id, to_column_id, relationship_type
FROM column_links
WHERE from_table_id = ANY(%s::uuid[]) OR to_table_id = ANY(%s::uuid[])
""",
(list(table_ids), list(table_ids)),
)
# Group links by from_table
joins_by_table: dict[str, list] = {}
for row in cur.fetchall():
from_table_id, from_col_id, to_table_id, to_col_id, rel_type = row
# Only include joins where both sides are in our connection
if from_table_id not in tables or to_table_id not in tables:
continue
joins_by_table.setdefault(from_table_id, []).append(
{
"from_col_id": from_col_id,
"to_table_id": to_table_id,
"to_col_id": to_col_id,
"relationship_type": rel_type,
}
)
conn.close()
# 4. Generate YAML files
os.makedirs(OUTPUT_DIR, exist_ok=True)
for table_id, table_name in sorted(tables.items(), key=lambda x: x[1]):
cols = columns_by_table.get(table_id, [])
joins = joins_by_table.get(table_id, [])
# Find primary key columns
pk_cols = [c for c in cols if c["primary_key"]]
if pk_cols:
grain = [c["name"] for c in pk_cols]
else:
# Fallback: use row_id if present, else first column
row_id_col = next((c for c in cols if c["name"] == "row_id"), None)
if row_id_col:
grain = ["row_id"]
elif cols:
grain = [cols[0]["name"]]
else:
grain = [table_name + "_id"]
# Build column definitions
yaml_columns = []
for c in cols:
sl_type = map_type(c["type"], c["name"])
col_def: dict = {"name": c["name"], "type": sl_type}
if is_time_column(c["name"], c["type"]):
col_def["role"] = "time"
yaml_columns.append(col_def)
# Build join definitions
yaml_joins = []
# Track target sources to handle aliases for multiple joins to same target
target_counts: dict[str, int] = {}
for j in joins:
to_name = tables.get(j["to_table_id"])
if not to_name:
continue
target_counts[to_name] = target_counts.get(to_name, 0) + 1
target_seen: dict[str, int] = {}
for j in joins:
to_name = tables.get(j["to_table_id"])
from_col = col_id_to_info.get(j["from_col_id"], {}).get("name")
to_col = col_id_to_info.get(j["to_col_id"], {}).get("name")
if not (to_name and from_col and to_col):
continue
rel = j["relationship_type"].lower()
join_def: dict = {
"to": to_name,
"on": f"{from_col} = {to_name}.{to_col}",
"relationship": rel,
}
# Add alias if multiple joins to same target
target_seen[to_name] = target_seen.get(to_name, 0) + 1
if target_counts.get(to_name, 0) > 1:
join_def["alias"] = f"{to_name}_{target_seen[to_name]}"
yaml_joins.append(join_def)
# Build source definition
source: dict = {
"name": table_name,
"table": table_name,
}
if grain:
source["grain"] = grain
source["columns"] = yaml_columns
if yaml_joins:
source["joins"] = yaml_joins
# Write YAML
filepath = os.path.join(OUTPUT_DIR, f"{table_name}.yaml")
with open(filepath, "w") as f:
yaml.dump(
source, f, default_flow_style=False, sort_keys=False, allow_unicode=True
)
print(f"Generated {len(tables)} source files in {OUTPUT_DIR}")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,110 @@
#!/usr/bin/env python3
"""Run a semantic layer query against the b2b_saas SQLite database.
Usage:
uv run python scripts/slquery.py '{"measures":["count(opportunities.opportunity_id)"],"dimensions":["accounts.segment"]}'
uv run python scripts/slquery.py '{"measures":["churn_risk.avg_risk_score"],"dimensions":["accounts.industry"]}'
echo '{"measures":["sum(contracts.arr)"],"dimensions":["accounts.segment"]}' | uv run python scripts/slquery.py --stdin
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sqlite3
import sys
from pathlib import Path
from semantic_layer.engine import SemanticEngine
SOURCES_DIR = Path(__file__).resolve().parent.parent / "sources" / "b2b_saas"
DB_PATH = Path(
os.environ.get("KTX_B2B_SQLITE_DB", "sample-data-generator/b2b_data.db")
).expanduser()
# sqlglot's sqlite dialect handles most transpilation, but has a few gaps.
# These fixups patch what sqlglot misses.
_SQLITE_FIXUPS = [
# GROUP_CONCAT(DISTINCT x, sep) → GROUP_CONCAT(DISTINCT x) — sqlite
# only allows 1 arg with DISTINCT
(r"GROUP_CONCAT\(DISTINCT (\w+),\s*'[^']*'\)", r"GROUP_CONCAT(DISTINCT \1)"),
# CURRENT_DATE - col → integer days via julianday
(
r"CURRENT_DATE - DATE\((\w+)\)",
r"CAST(julianday('now') - julianday(\1) AS INTEGER)",
),
(r"CURRENT_DATE - (\w+)", r"CAST(julianday('now') - julianday(\1) AS INTEGER)"),
# col - CURRENT_DATE → integer days via julianday
(r"(\w+) - CURRENT_DATE", r"CAST(julianday(\1) - julianday('now') AS INTEGER)"),
# CURRENT_DATE > col → julianday comparison
(r"CURRENT_DATE > (\w+)", r"julianday('now') > julianday(\1)"),
# NULLS LAST — not supported in sqlite
(r"\s+NULLS LAST", ""),
]
def fixup_sqlite(sql: str) -> str:
for pattern, repl in _SQLITE_FIXUPS:
sql = re.sub(pattern, repl, sql)
return sql
def main() -> None:
p = argparse.ArgumentParser(description="Run SL query against b2b_saas SQLite DB")
p.add_argument("query", nargs="?", help="JSON query string")
p.add_argument("--stdin", action="store_true", help="Read JSON from stdin")
p.add_argument(
"--sql-only", action="store_true", help="Print SQL without executing"
)
p.add_argument("--db", default=str(DB_PATH), help="Path to SQLite database")
p.add_argument(
"--sources", default=str(SOURCES_DIR), help="Path to sources directory"
)
args = p.parse_args()
if args.stdin:
query_dict = json.loads(sys.stdin.read())
elif args.query:
query_dict = json.loads(args.query)
else:
p.error("Provide a JSON query string or use --stdin")
# Use sqlite dialect — sqlglot handles STRING_AGG→GROUP_CONCAT,
# DECIMAL→REAL, ::DATE→DATE(), etc.
engine = SemanticEngine(args.sources, dialect="sqlite")
result = engine.query(query_dict)
sql = fixup_sqlite(result.sql)
if args.sql_only:
print(sql)
return
conn = sqlite3.connect(args.db)
conn.row_factory = sqlite3.Row
try:
rows = conn.execute(sql).fetchall()
except sqlite3.OperationalError as e:
print(f"SQL error: {e}", file=sys.stderr)
print(f"\nGenerated SQL:\n{sql}", file=sys.stderr)
sys.exit(1)
finally:
conn.close()
if not rows:
print("(no rows)")
return
cols = rows[0].keys()
widths = [max(len(str(c)), max(len(str(r[c])) for r in rows)) for c in cols]
header = " ".join(str(c).ljust(w) for c, w in zip(cols, widths))
sep = " ".join("-" * w for w in widths)
print(header)
print(sep)
for r in rows:
print(" ".join(str(r[c]).ljust(w) for c, w in zip(cols, widths)))
if __name__ == "__main__":
main()

View file

@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""Run TPC-H queries end-to-end: generate data + semantic layer SQL + execute.
Usage:
uv run python scripts/tpch_runner.py
"""
from __future__ import annotations
import json
import duckdb
import sqlglot
from semantic_layer.engine import SemanticEngine
TPCH_TABLES = [
"region",
"nation",
"supplier",
"customer",
"part",
"partsupp",
"orders",
"lineitem",
]
def setup_tpch(sf: float = 0.01) -> duckdb.DuckDBPyConnection:
"""Create in-memory DuckDB with TPC-H data at the given scale factor."""
conn = duckdb.connect()
conn.execute("INSTALL tpch; LOAD tpch")
conn.execute(f"CALL dbgen(sf={sf})")
# YAML files use public.<table> — create views to match
conn.execute("CREATE SCHEMA IF NOT EXISTS public")
for t in TPCH_TABLES:
conn.execute(f"CREATE VIEW public.{t} AS SELECT * FROM main.{t}")
return conn
def run_query(
conn: duckdb.DuckDBPyConnection,
engine: SemanticEngine,
title: str,
query_dict: dict,
) -> None:
"""Generate SQL via semantic layer, execute it, and print results."""
print(f"\n{'=' * 60}")
print(f" {title}")
print(f"{'=' * 60}")
print("\n>> Request:")
print(json.dumps(query_dict, indent=2))
result = engine.query(query_dict)
formatted_sql = sqlglot.transpile(
result.sql, read=result.dialect, write=result.dialect, pretty=True
)[0]
print(f"\n-- dialect: {result.dialect}")
print(formatted_sql)
cursor = conn.execute(result.sql)
col_names = [desc[0] for desc in cursor.description]
rows = cursor.fetchall()
# Simple table formatting
widths = [
max(len(str(c)), *(len(str(r[i])) for r in rows))
for i, c in enumerate(col_names)
]
header = " ".join(str(c).ljust(w) for c, w in zip(col_names, widths))
print(f"\n{header}")
print(" ".join("-" * w for w in widths))
for row in rows:
print(" ".join(str(v).ljust(w) for v, w in zip(row, widths)))
print(f"\n({len(rows)} rows)")
def main() -> None:
conn = setup_tpch()
engine = SemanticEngine("sources/tpch", dialect="duckdb")
# Q1: Pricing summary by return flag / line status
run_query(
conn,
engine,
"Q1: Pricing Summary",
{
"measures": [
"lineitem.revenue",
"lineitem.total_quantity",
"lineitem.avg_discount",
"lineitem.line_count",
],
"dimensions": ["lineitem.l_returnflag", "lineitem.l_linestatus"],
},
)
# Q5-style: Revenue by nation (4-hop join) with ASIA filter
run_query(
conn,
engine,
"Q5: Revenue by Nation (ASIA)",
{
"measures": ["lineitem.revenue"],
"dimensions": ["nation.n_name"],
"filters": ["region.r_name = 'ASIA'"],
},
)
# Q3-style: Revenue by order month for BUILDING segment
run_query(
conn,
engine,
"Q3: Revenue by Month (BUILDING)",
{
"measures": ["lineitem.revenue"],
"dimensions": [{"field": "orders.o_orderdate", "granularity": "month"}],
"filters": ["customer.c_mktsegment = 'BUILDING'"],
"limit": 12,
},
)
# Q10-style: Returned revenue by customer (filtered measure)
run_query(
conn,
engine,
"Q10: Returned Revenue by Customer",
{
"measures": ["lineitem.returned_revenue"],
"dimensions": ["customer.c_name"],
"order_by": [{"field": "lineitem.returned_revenue", "direction": "desc"}],
"limit": 10,
},
)
# Multi-measure: revenue + charge + counts
run_query(
conn,
engine,
"Multi-measure: Revenue, Charge, Counts",
{
"measures": [
"lineitem.revenue",
"lineitem.charge",
"orders.order_count",
],
"dimensions": ["customer.c_mktsegment"],
},
)
# Supply cost by nation (through partsupp bridge)
run_query(
conn,
engine,
"Supply Cost by Nation",
{
"measures": ["partsupp.total_supply_cost"],
"dimensions": ["nation.n_name"],
"limit": 10,
},
)
if __name__ == "__main__":
main()