fix(telemetry): classify daemon query rejections as expected, not faults (#335)

* fix(telemetry): classify daemon query rejections as expected, not faults

Semantic-layer query rejections and warehouse-execution rejections from the
sl_query MCP tool were wrapped as generic Errors, so reportException filed them
as PostHog $exception faults indistinguishable from real ktx bugs.

The daemon already separates a caller rejection (planner ValueError -> exit 3 /
HTTP 400) from a crash. The Node runner now carries that distinction as a typed
KtxDaemonComputeError, and a shared throwClassifiedQueryError promotes daemon
input-rejections and warehouse rejections to KtxQueryError while daemon crashes
and native JS faults still reach Error Tracking. query_semantic_layer stops
report_exception-ing expected ValueErrors, and a missing 'file:' secret now
raises KtxExpectedError so absent .ktx/secrets/<conn>-password stops filing
faults.

* chore: sync uv.lock to ktx-daemon/ktx-sl 0.15.0
This commit is contained in:
Andrey Avtomonov 2026-07-03 22:39:34 +02:00 committed by GitHub
parent a651b82e2f
commit 5d17469601
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 347 additions and 28 deletions

View file

@ -1,6 +1,7 @@
import { readFileSync } from 'node:fs';
import { homedir } from 'node:os';
import { resolve } from 'node:path';
import { KtxExpectedError } from '../../errors.js';
/** @internal */
export function resolveKtxHomePath(path: string): string {
@ -28,7 +29,15 @@ export function resolveKtxConfigReference(value: string | undefined, env: NodeJS
if (value.startsWith('file:')) {
const filePath = resolveKtxHomePath(value.slice('file:'.length).trim());
const fileValue = readFileSync(filePath, 'utf8').trim();
let fileValue: string;
try {
fileValue = readFileSync(filePath, 'utf8').trim();
} catch (error) {
// A `file:` secret whose target is missing or unreadable is a config
// problem (secrets/ is gitignored, so absent after a clone), not a ktx
// fault — classify it so it stays out of Error Tracking.
throw new KtxExpectedError(`Could not read the secret file referenced in ktx.yaml: ${filePath}`, { cause: error });
}
return fileValue.length > 0 ? fileValue : undefined;
}

View file

@ -4,6 +4,44 @@ import { URL } from 'node:url';
import { spawn } from 'node:child_process';
import type { ResolvedSemanticLayerSource, SemanticLayerQueryInput } from '../sl/types.js';
// The daemon signals "the caller sent a well-formed request I refused as
// invalid" with a distinct subprocess exit code (INPUT_REJECTED_EXIT_CODE in
// python/ktx-daemon/__main__.py) or HTTP 400. Kept in sync across the boundary.
const DAEMON_INPUT_REJECTED_EXIT_CODE = 3;
const DAEMON_INPUT_REJECTED_HTTP_STATUS = 400;
/**
* A ktx-daemon compute call failed. `inputRejected` is true when the daemon
* refused the request as invalid (a caller-driven outcome) rather than faulting;
* callers can promote those to an expected KtxQueryError while letting genuine
* daemon crashes reach Error Tracking. `detail` is the daemon's own message,
* unwrapped for surfacing to the caller.
*/
export class KtxDaemonComputeError extends Error {
readonly inputRejected: boolean;
readonly detail: string;
constructor(message: string, options: { inputRejected: boolean; detail: string; cause?: unknown }) {
super(message, options.cause !== undefined ? { cause: options.cause } : undefined);
this.name = 'KtxDaemonComputeError';
this.inputRejected = options.inputRejected;
this.detail = options.detail;
}
}
/** The daemon reports HTTP failures as `{ "detail": "…" }`; fall back to the raw body. */
function daemonHttpErrorDetail(raw: string): string {
try {
const parsed = JSON.parse(raw) as unknown;
if (parsed && typeof parsed === 'object' && typeof (parsed as { detail?: unknown }).detail === 'string') {
return (parsed as { detail: string }).detail;
}
} catch {
// Non-JSON body (e.g. a proxy error page) — surface it verbatim.
}
return raw;
}
interface KtxSemanticLayerComputeQueryResult {
sql: string;
dialect: string;
@ -128,7 +166,13 @@ function runProcessJson(
const stdoutText = Buffer.concat(stdout).toString('utf8').trim();
const stderrText = Buffer.concat(stderr).toString('utf8').trim();
if (code !== 0) {
reject(new Error(`ktx-daemon ${subcommand} failed: ${stderrText || `exit code ${code}`}`));
const detail = stderrText || `exit code ${code}`;
reject(
new KtxDaemonComputeError(`ktx-daemon ${subcommand} failed: ${detail}`, {
inputRejected: code === DAEMON_INPUT_REJECTED_EXIT_CODE,
detail,
}),
);
return;
}
try {
@ -168,7 +212,12 @@ function postJson(baseUrl: string): KtxDaemonHttpJsonRunner {
const text = Buffer.concat(chunks).toString('utf8');
const statusCode = response.statusCode ?? 0;
if (statusCode < 200 || statusCode >= 300) {
reject(new Error(`ktx-daemon HTTP ${path} failed with ${statusCode}: ${text}`));
reject(
new KtxDaemonComputeError(`ktx-daemon HTTP ${path} failed with ${statusCode}: ${text}`, {
inputRejected: statusCode === DAEMON_INPUT_REJECTED_HTTP_STATUS,
detail: daemonHttpErrorDetail(text),
}),
);
return;
}
try {

View file

@ -1,5 +1,5 @@
import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-executor.js';
import { KtxExpectedError } from '../../errors.js';
import { KtxExpectedError, KtxQueryError, isNativeProgrammingFault } from '../../errors.js';
import { isDatabaseDriver, normalizeConnectionDriver } from '../../connection-drivers.js';
import { sqlDialectNotes } from '../../context/sql-analysis/dialect-notes.js';
import type { KtxProjectConnectionConfig } from '../../context/project/config.js';
@ -11,7 +11,7 @@ import {
localConnectionInfoFromConfig,
} from '../../context/connections/local-warehouse-descriptor.js';
import type { KtxEmbeddingPort } from '../../context/core/embedding.js';
import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js';
import { KtxDaemonComputeError, type KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js';
import type { KtxLocalProject } from '../../context/project/project.js';
import { createKtxEntityDetailsService } from '../../context/scan/entity-details.js';
import type { LocalScanMcpOptions } from '../../context/scan/local-scan.js';
@ -34,6 +34,26 @@ interface CreateLocalProjectMcpContextPortsOptions {
embeddingService: KtxEmbeddingPort | null;
}
/**
* Reclassify a query-path failure. Warehouse/driver rejections and daemon
* input-rejections are caller-driven outcomes (KtxQueryError, kept out of Error
* Tracking while preserving the underlying diagnostics); native JS faults, daemon
* crashes, and already-expected errors propagate unchanged so genuine ktx bugs
* still reach Error Tracking.
*/
function throwClassifiedQueryError(error: unknown): never {
if (error instanceof KtxDaemonComputeError) {
if (error.inputRejected) {
throw new KtxQueryError(error.detail, { cause: error });
}
throw error;
}
if (isNativeProgrammingFault(error) || error instanceof KtxExpectedError) {
throw error;
}
throw new KtxQueryError(error instanceof Error ? error.message : String(error), { cause: error });
}
async function executeValidatedReadOnlySql(
project: KtxLocalProject,
options: CreateLocalProjectMcpContextPortsOptions,
@ -170,15 +190,19 @@ export function createLocalProjectMcpContextPorts(
if (!options.semanticLayerCompute) {
throw new Error('sl_query requires a semantic-layer query adapter.');
}
return compileLocalSlQuery(project, {
connectionId: input.connectionId,
query: input.query,
compute: options.semanticLayerCompute,
execute: Boolean(options.queryExecutor),
maxRows: input.query.limit,
queryExecutor: options.queryExecutor,
onProgress: executionOptions?.onProgress,
});
try {
return await compileLocalSlQuery(project, {
connectionId: input.connectionId,
query: input.query,
compute: options.semanticLayerCompute,
execute: Boolean(options.queryExecutor),
maxRows: input.query.limit,
queryExecutor: options.queryExecutor,
onProgress: executionOptions?.onProgress,
});
} catch (error) {
throwClassifiedQueryError(error);
}
},
},
entityDetails: {

View file

@ -3,6 +3,7 @@ import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { describe, expect, it } from 'vitest';
import { resolveKtxConfigReference, resolveKtxHomePath } from '../../../src/context/core/config-reference.js';
import { KtxExpectedError } from '../../../src/errors.js';
describe('ktx config references', () => {
it('resolves env references without returning empty values', () => {
@ -22,6 +23,17 @@ describe('ktx config references', () => {
expect(resolveKtxConfigReference(`file:${keyPath}`, {})).toBe('file-gateway-key');
});
it('raises an expected error when a file reference is missing', () => {
const missing = join(tmpdir(), `ktx-config-reference-missing-${process.pid}`, 'absent-password');
try {
resolveKtxConfigReference(`file:${missing}`, {});
expect.unreachable('expected a thrown error for the missing secret file');
} catch (error) {
expect(error).toBeInstanceOf(KtxExpectedError);
expect((error as Error).message).toContain(missing);
}
});
it('returns literal values unchanged after trimming blank-only values', () => {
expect(resolveKtxConfigReference('provider/model', {})).toBe('provider/model');
expect(resolveKtxConfigReference(' ', {})).toBeUndefined();

View file

@ -1,7 +1,11 @@
import { once } from 'node:events';
import { createServer } from 'node:http';
import { describe, expect, it, vi } from 'vitest';
import { createHttpSemanticLayerComputePort, createPythonSemanticLayerComputePort } from '../../../src/context/daemon/semantic-layer-compute.js';
import {
createHttpSemanticLayerComputePort,
createPythonSemanticLayerComputePort,
KtxDaemonComputeError,
} from '../../../src/context/daemon/semantic-layer-compute.js';
const source = {
name: 'orders',
@ -174,6 +178,79 @@ describe('createPythonSemanticLayerComputePort', () => {
});
});
describe('KtxDaemonComputeError classification', () => {
const query = { sources: [source], dialect: 'postgres', query: { measures: ['count(*)'], dimensions: [] } };
function exitingPort(code: number, stderr: string) {
return createPythonSemanticLayerComputePort({
command: process.execPath,
args: [
'-e',
`process.stdin.on('data',()=>{});process.stdin.on('end',()=>{process.stderr.write(${JSON.stringify(stderr)});process.exit(${code})});`,
],
});
}
async function rejection(promise: Promise<unknown>): Promise<KtxDaemonComputeError> {
const error = await promise.then(
() => null,
(thrown: unknown) => thrown,
);
expect(error).toBeInstanceOf(KtxDaemonComputeError);
return error as KtxDaemonComputeError;
}
it('marks a subprocess input-rejection (exit 3) as inputRejected', async () => {
const error = await rejection(exitingPort(3, 'Measure expr does not reference any source').query(query));
expect(error.inputRejected).toBe(true);
expect(error.detail).toContain('does not reference any source');
});
it('marks a subprocess fault (exit 1) as not inputRejected', async () => {
const error = await rejection(exitingPort(1, 'Traceback: boom').query(query));
expect(error.inputRejected).toBe(false);
expect(error.detail).toContain('boom');
});
async function statusPort(statusCode: number, body: string): Promise<{ port: ReturnType<typeof createHttpSemanticLayerComputePort>; close: () => void }> {
const server = createServer((_request, response) => {
response.writeHead(statusCode, { 'content-type': 'application/json' });
response.end(body);
});
server.listen(0, '127.0.0.1');
await once(server, 'listening');
const address = server.address();
if (!address || typeof address === 'string') {
throw new Error('expected TCP server address');
}
return {
port: createHttpSemanticLayerComputePort({ baseUrl: `http://127.0.0.1:${address.port}` }),
close: () => server.close(),
};
}
it('marks an HTTP 400 as inputRejected and unwraps the daemon detail', async () => {
const { port, close } = await statusPort(400, JSON.stringify({ detail: 'Measure expr does not reference any source' }));
try {
const error = await rejection(port.query(query));
expect(error.inputRejected).toBe(true);
expect(error.detail).toBe('Measure expr does not reference any source');
} finally {
close();
}
});
it('marks an HTTP 500 as not inputRejected', async () => {
const { port, close } = await statusPort(500, JSON.stringify({ detail: 'Daemon request failed: boom' }));
try {
const error = await rejection(port.query(query));
expect(error.inputRejected).toBe(false);
} finally {
close();
}
});
});
describe('createHttpSemanticLayerComputePort', () => {
it('calls semantic query and validate HTTP endpoints through an injected runner', async () => {
const requestJson = vi.fn(async (path: string) => {

View file

@ -4,6 +4,7 @@ import { join } from 'node:path';
import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest';
import { initKtxProject } from '../../../src/context/project/project.js';
import { KtxExpectedError, KtxQueryError } from '../../../src/errors.js';
import { KtxDaemonComputeError } from '../../../src/context/daemon/semantic-layer-compute.js';
import { createKtxConnectorCapabilities, type KtxQueryResult, type KtxScanConnector, type KtxSchemaSnapshot } from '../../../src/context/scan/types.js';
import { SemanticLayerService } from '../../../src/context/sl/semantic-layer.service.js';
import type { SemanticLayerSource } from '../../../src/context/sl/types.js';
@ -1177,4 +1178,86 @@ describe('createLocalProjectMcpContextPorts', () => {
}),
);
});
async function seedOrdersWarehouse() {
const project = await initKtxProject({ projectDir: tempDir });
project.config.connections.warehouse = { driver: 'postgres', url: 'env:DATABASE_URL' };
await seedSlSourceFile(project, {
connectionId: 'warehouse',
sourceName: 'orders',
yaml: ['name: orders', 'table: public.orders', 'grain:', ' - id', 'columns:', ' - name: id', ' type: number', 'joins: []', 'measures: []', ''].join('\n'),
});
return project;
}
it('promotes a daemon input-rejection to an expected KtxQueryError carrying the daemon detail', async () => {
const project = await seedOrdersWarehouse();
const semanticLayerCompute = {
validateSources: vi.fn(),
generateSources: vi.fn(),
query: vi.fn(async () => {
throw new KtxDaemonComputeError("ktx-daemon semantic-query failed: Measure expr 'count(*)' does not reference any source", {
inputRejected: true,
detail: "Measure expr 'count(*)' does not reference any source",
});
}),
};
const ports = createLocalProjectMcpContextPorts(project, { semanticLayerCompute, embeddingService: null });
const rejection = ports.semanticLayer?.query({
connectionId: 'warehouse',
query: { measures: [{ expr: 'count(*)', name: 'n' }], dimensions: [] },
});
await expect(rejection).rejects.toBeInstanceOf(KtxQueryError);
await expect(rejection).rejects.toThrow("Measure expr 'count(*)' does not reference any source");
});
it('leaves a daemon crash as an unexpected fault', async () => {
const project = await seedOrdersWarehouse();
const semanticLayerCompute = {
validateSources: vi.fn(),
generateSources: vi.fn(),
query: vi.fn(async () => {
throw new KtxDaemonComputeError('ktx-daemon semantic-query failed: KeyError: boom', {
inputRejected: false,
detail: 'KeyError: boom',
});
}),
};
const ports = createLocalProjectMcpContextPorts(project, { semanticLayerCompute, embeddingService: null });
const rejection = ports.semanticLayer?.query({
connectionId: 'warehouse',
query: { measures: ['orders.order_count'], dimensions: [] },
});
await expect(rejection).rejects.toBeInstanceOf(KtxDaemonComputeError);
await expect(rejection).rejects.not.toBeInstanceOf(KtxQueryError);
});
it('wraps a warehouse execution rejection from sl_query as KtxQueryError', async () => {
const project = await seedOrdersWarehouse();
const semanticLayerCompute = {
validateSources: vi.fn(),
generateSources: vi.fn(),
query: vi.fn(async () => ({
sql: 'select count(*) from public.orders',
dialect: 'postgres',
columns: [{ name: 'orders.order_count' }],
plan: {},
})),
};
const queryExecutor = {
execute: vi.fn(async () => {
throw new Error("Unknown column '검사 유형' in 'SELECT'");
}),
};
const ports = createLocalProjectMcpContextPorts(project, { semanticLayerCompute, queryExecutor, embeddingService: null });
const rejection = ports.semanticLayer?.query({
connectionId: 'warehouse',
query: { measures: ['orders.order_count'], dimensions: [], limit: 5 },
});
await expect(rejection).rejects.toBeInstanceOf(KtxQueryError);
await expect(rejection).rejects.toThrow("Unknown column '검사 유형' in 'SELECT'");
});
});

View file

@ -35,6 +35,12 @@ from ktx_daemon.source_generation import (
generate_sources_response,
)
# The caller (the Node client) sent a well-formed request the compute layer
# refused as invalid — e.g. the planner rejecting an agent's query. A distinct
# exit code lets the client classify it as an expected outcome rather than a
# ktx fault. Kept in sync with DAEMON_INPUT_REJECTED_EXIT_CODE on the Node side.
INPUT_REJECTED_EXIT_CODE = 3
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="ktx-daemon")
@ -210,9 +216,17 @@ def main(argv: list[str] | None = None) -> int:
return 2
sys.stdout.write(response.model_dump_json() + "\n")
return 0
except (json.JSONDecodeError, ValidationError, ValueError) as error:
except (json.JSONDecodeError, ValidationError) as error:
# Malformed request envelope — ktx sent bad JSON or a mis-shaped payload.
# That is a ktx fault, so keep the generic non-zero code (JSONDecodeError
# subclasses ValueError, so this clause must precede the ValueError one).
sys.stderr.write(f"{error}\n")
return 1
except ValueError as error:
# The compute layer refused a well-formed request as invalid (e.g. the
# planner rejecting the agent's measures). Expected, not a fault.
sys.stderr.write(f"{error}\n")
return INPUT_REJECTED_EXIT_CODE
except Exception as error:
from ktx_daemon.telemetry import report_exception

View file

@ -6,7 +6,7 @@ import time
from typing import Any
from ktx_daemon.telemetry import error_class, report_exception, track_telemetry_event
from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict, Field, ValidationError
from semantic_layer.duplicate_check import validate_measure_duplicates
from semantic_layer.engine import SemanticEngine
from semantic_layer.models import QueryResult, SourceDefinition
@ -150,13 +150,18 @@ def query_semantic_layer(
track_telemetry_event(
"sql_gen_completed", sql_fields, project_id=request.project_id
)
report_exception(
error,
source="semantic-query",
handled=True,
fatal=False,
project_id=request.project_id,
)
# A ValueError is the engine refusing the caller's query — an expected
# rejection surfaced to the agent, not a ktx fault. Keep it out of Error
# Tracking (a pydantic ValidationError subclasses ValueError but means a
# malformed request envelope, so it stays a reported fault).
if not isinstance(error, ValueError) or isinstance(error, ValidationError):
report_exception(
error,
source="semantic-query",
handled=True,
fatal=False,
project_id=request.project_id,
)
raise

View file

@ -91,6 +91,22 @@ def test_command_returns_nonzero_for_invalid_json() -> None:
assert "Expecting property name enclosed in double quotes" in result.stderr
def test_semantic_query_rejects_invalid_query_with_input_rejected_code() -> None:
# A planner ValueError (agent's measure references no source) is an expected
# input rejection, distinguished from a fault by INPUT_REJECTED_EXIT_CODE (3).
result = run_daemon_command(
"semantic-query",
{
"sources": [ORDERS_SOURCE],
"dialect": "postgres",
"query": {"measures": ["count(*)"], "dimensions": []},
},
)
assert result.returncode == 3, result.stderr
assert "does not reference any source" in result.stderr
def test_serve_http_command_starts_uvicorn_without_reading_stdin(
monkeypatch,
) -> None:

View file

@ -125,7 +125,7 @@ def test_query_semantic_layer_emits_plan_and_sql_debug_events(
assert "public.orders" not in captured.err
def test_query_semantic_layer_reports_exception(monkeypatch) -> None:
def test_query_semantic_layer_reports_unexpected_fault(monkeypatch) -> None:
from ktx_daemon import semantic_layer as semantic_layer_module
reports: list[dict[str, object]] = []
@ -133,12 +133,16 @@ def test_query_semantic_layer_reports_exception(monkeypatch) -> None:
def fake_report(exception: BaseException, **kwargs: object) -> None:
reports.append({"exception": exception, **kwargs})
monkeypatch.setattr(semantic_layer_module, "report_exception", fake_report)
def boom(*_args: object, **_kwargs: object) -> None:
raise RuntimeError("engine construction failed")
with pytest.raises(ValueError):
monkeypatch.setattr(semantic_layer_module, "report_exception", fake_report)
monkeypatch.setattr(semantic_layer_module.SemanticEngine, "from_sources", boom)
with pytest.raises(RuntimeError):
query_semantic_layer(
SemanticLayerQueryRequest(
sources=[ORDERS_SOURCE, ORDERS_SOURCE],
sources=[ORDERS_SOURCE],
dialect="postgres",
projectId="a" * 64,
query={"measures": ["orders.order_count"]},
@ -152,6 +156,32 @@ def test_query_semantic_layer_reports_exception(monkeypatch) -> None:
assert reports[0]["project_id"] == "a" * 64
def test_query_semantic_layer_does_not_report_expected_query_rejection(
monkeypatch,
) -> None:
from ktx_daemon import semantic_layer as semantic_layer_module
reports: list[dict[str, object]] = []
monkeypatch.setattr(
semantic_layer_module,
"report_exception",
lambda *_args, **kwargs: reports.append(kwargs),
)
# A planner ValueError is the engine refusing the agent's query — surfaced to
# the caller and re-raised, but never filed as a ktx fault.
with pytest.raises(ValueError, match="does not reference any source"):
query_semantic_layer(
SemanticLayerQueryRequest(
sources=[ORDERS_SOURCE],
dialect="postgres",
query={"measures": ["count(*)"]},
)
)
assert reports == []
def test_semantic_layer_request_rejects_project_id_field_name() -> None:
with pytest.raises(ValueError):
SemanticLayerQueryRequest(