feat: emit query telemetry

This commit is contained in:
Andrey Avtomonov 2026-05-22 15:59:44 +02:00
parent 31efe1011d
commit 9956ce398f
6 changed files with 253 additions and 10 deletions

View file

@ -8,12 +8,13 @@ import { writeLocalKnowledgePage } from './context/wiki/local-knowledge.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { runKtxKnowledge } from './knowledge.js';
function makeIo() {
function makeIo(options: { isTTY?: boolean } = {}) {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
isTTY: options.isTTY,
write: (chunk: string) => {
stdout += chunk;
},
@ -72,6 +73,7 @@ describe('runKtxKnowledge', () => {
});
afterEach(async () => {
vi.unstubAllEnvs();
await rm(tempDir, { recursive: true, force: true });
});
@ -96,6 +98,26 @@ describe('runKtxKnowledge', () => {
expect(searchIo.stdout()).toContain('metrics-revenue');
});
it('emits debug telemetry for wiki search without query text', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await seedWikiPage(projectDir);
const searchIo = makeIo({ isTTY: true });
await expect(
runKtxKnowledge(
{ command: 'search', projectDir, query: 'revenue recognition', userId: 'local', cliVersion: '0.0.0-test' },
searchIo.io,
),
).resolves.toBe(0);
expect(searchIo.stderr()).toContain('"event":"wiki_query_completed"');
expect(searchIo.stderr()).toContain('"queryLength"');
expect(searchIo.stderr()).not.toContain('revenue recognition');
});
it('prints wiki search rank badges in pretty output', async () => {
const projectDir = join(tempDir, 'rank-project');
await initKtxProject({ projectDir });

View file

@ -8,6 +8,7 @@ import {
} from './embedding-resolution.js';
import { resolveOutputMode } from './io/mode.js';
import { createRankBadgeFormatter, printList, type PrintListColumn } from './io/print-list.js';
import { emitTelemetryEvent } from './telemetry/index.js';
export type KtxKnowledgeArgs =
| { command: 'list'; projectDir: string; userId: string; output?: string; json?: boolean; cliVersion: string }
@ -108,6 +109,7 @@ export async function runKtxKnowledge(
io: KtxKnowledgeIo = process,
deps: KtxKnowledgeDeps = {},
): Promise<number> {
const startedAt = performance.now();
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
if (args.command === 'list') {
@ -135,6 +137,17 @@ export async function runKtxKnowledge(
embeddingService,
limit: args.limit,
});
await emitTelemetryEvent({
name: 'wiki_query_completed',
projectDir: args.projectDir,
io,
fields: {
queryLength: args.query.length,
resultCount: results.length,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
if (args.debug) {
writeWikiSearchDebug(io, {
mode: project.config.storage.search,
@ -167,6 +180,19 @@ export async function runKtxKnowledge(
}
return 0;
} catch (error) {
if (args.command === 'search') {
await emitTelemetryEvent({
name: 'wiki_query_completed',
projectDir: args.projectDir,
io,
fields: {
queryLength: args.query.length,
resultCount: 0,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'error',
},
});
}
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}

View file

@ -18,12 +18,13 @@ const ORDERS_YAML = [
'',
].join('\n');
function makeIo() {
function makeIo(options: { isTTY?: boolean } = {}) {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
isTTY: options.isTTY,
write: (chunk: string) => {
stdout += chunk;
},
@ -63,6 +64,7 @@ describe('runKtxSl', () => {
});
afterEach(async () => {
vi.unstubAllEnvs();
await rm(tempDir, { recursive: true, force: true });
});
@ -289,6 +291,43 @@ joins: []
expect(stderr.write).not.toHaveBeenCalled();
});
it('emits debug telemetry for sl query without project paths', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const projectDir = join(tempDir, 'project');
await seedSlSource({ projectDir });
const io = makeIo({ isTTY: true });
const createSemanticLayerCompute = vi.fn(() => ({
query: vi.fn(async () => ({
sql: 'select count(*) as order_count from public.orders',
dialect: 'postgres',
columns: [{ name: 'orders.order_count' }],
plan: {},
})),
validateSources: vi.fn(),
generateSources: vi.fn(),
}));
const code = await runKtxSl(
{
command: 'query',
projectDir,
connectionId: 'warehouse',
query: { measures: ['orders.order_count'], dimensions: [] },
format: 'json',
execute: false,
cliVersion: '0.2.0',
runtimeInstallPolicy: 'auto',
},
io.io,
{ createSemanticLayerCompute },
);
expect(code).toBe(0);
expect(io.stderr()).toContain('"event":"sl_query_completed"');
expect(io.stderr()).not.toContain(projectDir);
});
it('runs sl query from a JSON query file', async () => {
const projectDir = join(tempDir, 'project');
const project = await initKtxProject({ projectDir });

View file

@ -1,4 +1,5 @@
import { readFile } from 'node:fs/promises';
import type { KtxCliIo } from './cli-runtime.js';
import { createDefaultLocalQueryExecutor } from './context/connections/local-query-executor.js';
import type { KtxSqlQueryExecutorPort } from './context/connections/query-executor.js';
import { KtxIngestEmbeddingPortAdapter } from './context/llm/embedding-port.js';
@ -18,6 +19,8 @@ import {
type KtxManagedPythonInstallPolicy,
} from './managed-python-command.js';
import { profileMark } from './startup-profile.js';
import { emitTelemetryEvent } from './telemetry/index.js';
import { scrubErrorClass } from './telemetry/scrubber.js';
profileMark('module:sl');
@ -56,10 +59,7 @@ export type KtxSlArgs =
runtimeInstallPolicy: KtxManagedPythonInstallPolicy;
};
interface KtxSlIo {
stdout: { write(chunk: string): void };
stderr: { write(chunk: string): void };
}
type KtxSlIo = KtxCliIo;
interface KtxSlDeps {
loadProject?: typeof loadKtxProject;
@ -85,6 +85,14 @@ function resolutionToEmbeddingPort(resolution: EmbeddingProviderResolution): Ktx
return null;
}
function queryMeasureCount(query: SemanticLayerQueryInput): number {
return Array.isArray(query.measures) ? query.measures.length : 0;
}
function queryDimensionCount(query: SemanticLayerQueryInput): number {
return Array.isArray(query.dimensions) ? query.dimensions.length : 0;
}
async function printSlSources(input: {
rows: ReadonlyArray<LocalSlSourceSummary>;
command: 'sl list';
@ -177,6 +185,8 @@ async function readSlQueryFile(path: string): Promise<SemanticLayerQueryInput> {
}
export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: KtxSlDeps = {}): Promise<number> {
const startedAt = performance.now();
let queryForTelemetry: SemanticLayerQueryInput | undefined;
try {
const project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir });
if (args.command === 'list') {
@ -234,6 +244,18 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx
connectionId: args.connectionId,
sourceName: args.sourceName,
});
await emitTelemetryEvent({
name: 'sl_validate_completed',
projectDir: args.projectDir,
io,
fields: {
sourceCount: source ? 1 : 0,
modelCount: 0,
validationErrorCount: result.valid ? 0 : result.errors.length,
outcome: result.valid ? 'ok' : 'error',
durationMs: Math.max(0, performance.now() - startedAt),
},
});
if (!result.valid) {
for (const error of result.errors) {
io.stderr.write(`${error}\n`);
@ -248,6 +270,7 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx
if (!query) {
throw new Error('sl query requires query input from --query-file or at least one --measure');
}
queryForTelemetry = query;
const compute = deps.createSemanticLayerCompute
? deps.createSemanticLayerCompute()
: await (deps.createManagedSemanticLayerCompute ?? createManagedPythonSemanticLayerComputePort)({
@ -264,6 +287,19 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx
maxRows: args.maxRows,
queryExecutor,
});
await emitTelemetryEvent({
name: 'sl_query_completed',
projectDir: args.projectDir,
io,
fields: {
mode: args.execute ? 'execute' : 'compile',
referencedSourceCount: result.plan && typeof result.plan === 'object' ? 1 : 0,
referencedDimensionCount: queryDimensionCount(query),
referencedMeasureCount: queryMeasureCount(query),
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
if (args.format === 'sql') {
io.stdout.write(`${result.sql}\n`);
return 0;
@ -274,6 +310,39 @@ export async function runKtxSl(args: KtxSlArgs, io: KtxSlIo = process, deps: Ktx
const _exhaustive: never = args;
throw new Error(`Unsupported sl command: ${JSON.stringify(_exhaustive)}`);
} catch (error) {
if (args.command === 'validate') {
const errorClass = scrubErrorClass(error);
await emitTelemetryEvent({
name: 'sl_validate_completed',
projectDir: args.projectDir,
io,
fields: {
sourceCount: 0,
modelCount: 0,
validationErrorCount: 0,
outcome: 'error',
...(errorClass ? { errorClass } : {}),
durationMs: Math.max(0, performance.now() - startedAt),
},
});
}
if (args.command === 'query') {
const errorClass = scrubErrorClass(error);
await emitTelemetryEvent({
name: 'sl_query_completed',
projectDir: args.projectDir,
io,
fields: {
mode: args.execute ? 'execute' : 'compile',
referencedSourceCount: 0,
referencedDimensionCount: queryForTelemetry ? queryDimensionCount(queryForTelemetry) : 0,
referencedMeasureCount: queryForTelemetry ? queryMeasureCount(queryForTelemetry) : 0,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'error',
...(errorClass ? { errorClass } : {}),
},
});
}
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}

View file

@ -8,12 +8,13 @@ import type { SqlAnalysisPort } from './context/sql-analysis/ports.js';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { runKtxSql } from './sql.js';
function makeIo() {
function makeIo(options: { isTTY?: boolean } = {}) {
let stdout = '';
let stderr = '';
return {
io: {
stdout: {
isTTY: options.isTTY,
write: (chunk: string) => {
stdout += chunk;
},
@ -31,7 +32,12 @@ function makeIo() {
function makeSqlAnalysis(result: Awaited<ReturnType<SqlAnalysisPort['validateReadOnly']>>): SqlAnalysisPort {
return {
analyzeForFingerprint: vi.fn(),
analyzeForFingerprint: vi.fn(async () => ({
fingerprint: 'select-from-orders',
normalizedSql: 'select id, status from orders',
tablesTouched: ['orders'],
literalSlots: [],
})),
analyzeBatch: vi.fn(),
validateReadOnly: vi.fn(async () => result),
};
@ -76,6 +82,7 @@ describe('runKtxSql', () => {
});
afterEach(async () => {
vi.unstubAllEnvs();
await rm(tempDir, { recursive: true, force: true });
});
@ -130,6 +137,39 @@ describe('runKtxSql', () => {
expect(io.stderr()).toBe('');
});
it('emits debug telemetry for SQL without raw query text', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });
await writeConnections(projectDir, { warehouse: { driver: 'sqlite', path: 'warehouse.db' } });
const io = makeIo({ isTTY: true });
await expect(
runKtxSql(
{
command: 'execute',
projectDir,
connectionId: 'warehouse',
sql: 'select count(*) from orders',
maxRows: 10,
output: 'json',
json: true,
cliVersion: '0.0.0-test',
},
io.io,
{
createSqlAnalysis: () => makeSqlAnalysis({ ok: true, error: null }),
createScanConnector: vi.fn(async () => makeConnector()),
},
),
).resolves.toBe(0);
expect(io.stderr()).toContain('"event":"sql_completed"');
expect(io.stderr()).toContain('"queryVerb":"select"');
expect(io.stderr()).not.toContain('select count(*)');
});
it('prints JSON output', async () => {
const projectDir = join(tempDir, 'project');
await initKtxProject({ projectDir });

View file

@ -6,6 +6,9 @@ import { type KtxOutputMode, resolveOutputMode } from './io/mode.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import { createManagedDaemonSqlAnalysisPort } from './managed-python-http.js';
import { profileMark } from './startup-profile.js';
import { isDemoConnection } from './telemetry/demo-detect.js';
import { emitTelemetryEvent } from './telemetry/index.js';
import { scrubErrorClass } from './telemetry/scrubber.js';
profileMark('module:sql');
@ -54,6 +57,14 @@ function sqlAnalysisDialectForDriver(driver: string | undefined): SqlAnalysisDia
return map[normalized] ?? 'postgres';
}
function queryVerb(sql: string): 'select' | 'explain' | 'show' | 'with' | 'other' {
const first = sql.trim().split(/\s+/, 1)[0]?.toLowerCase();
if (first === 'select' || first === 'explain' || first === 'show' || first === 'with') {
return first;
}
return 'other';
}
function formatValue(value: unknown): string {
if (value === null || value === undefined) return '';
if (typeof value === 'string') return value;
@ -119,14 +130,19 @@ function resultOutput(connectionId: string, result: KtxQueryResult): SqlExecutio
}
export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps: KtxSqlDeps = {}): Promise<number> {
const startedAt = performance.now();
let driver = 'unknown';
let demoConnection = false;
try {
const project = await (deps.loadProject ?? loadKtxProject)({ projectDir: args.projectDir });
const connection = project.config.connections[args.connectionId];
if (!connection) {
throw new Error(`Connection "${args.connectionId}" is not configured in ktx.yaml`);
}
driver = String(connection.driver ?? 'unknown').toLowerCase();
demoConnection = isDemoConnection(args.connectionId, connection);
const sqlAnalysis =
const createSqlAnalysis =
deps.createSqlAnalysis ??
(() =>
createManagedDaemonSqlAnalysisPort({
@ -135,10 +151,13 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
installPolicy: 'auto',
io,
}));
const validation = await sqlAnalysis().validateReadOnly(args.sql, sqlAnalysisDialectForDriver(connection.driver));
const analysisPort = createSqlAnalysis();
const dialect = sqlAnalysisDialectForDriver(connection.driver);
const validation = await analysisPort.validateReadOnly(args.sql, dialect);
if (!validation.ok) {
throw new Error(validation.error ?? 'SQL is not read-only.');
}
const analysis = await analysisPort.analyzeForFingerprint(args.sql, dialect);
const createScanConnector = deps.createScanConnector ?? createKtxCliScanConnector;
let connector: KtxScanConnector | null = null;
@ -157,11 +176,39 @@ export async function runKtxSql(args: KtxSqlArgs, io: KtxCliIo = process, deps:
);
const mode = resolveOutputMode({ explicit: args.output, json: args.json, io });
printSqlResult(resultOutput(args.connectionId, result), mode, io);
await emitTelemetryEvent({
name: 'sql_completed',
projectDir: args.projectDir,
io,
fields: {
driver,
isDemoConnection: demoConnection,
queryVerb: queryVerb(args.sql),
referencedTableCount: analysis.tablesTouched.length,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
return 0;
} finally {
await cleanupConnector(connector);
}
} catch (error) {
const errorClass = scrubErrorClass(error);
await emitTelemetryEvent({
name: 'sql_completed',
projectDir: args.projectDir,
io,
fields: {
driver,
isDemoConnection: demoConnection,
queryVerb: queryVerb(args.sql),
referencedTableCount: 0,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'error',
...(errorClass ? { errorClass } : {}),
},
});
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}