feat: emit ingest and scan telemetry

This commit is contained in:
Andrey Avtomonov 2026-05-22 15:57:55 +02:00
parent 1b7c9fcdba
commit 31efe1011d
5 changed files with 162 additions and 3 deletions

View file

@ -1,5 +1,9 @@
import { mkdtemp, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from './context/project/config.js';
import { describe, expect, it, vi } from 'vitest';
import { initKtxProject } from './context/project/project.js';
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
buildPublicIngestPlan,
type KtxPublicIngestDeps,
@ -395,6 +399,10 @@ describe('buildPublicIngestPlan', () => {
});
describe('runKtxPublicIngest', () => {
afterEach(() => {
vi.unstubAllEnvs();
});
it('maps fast and deep database targets to scan internals', async () => {
const io = makeIo();
const project = deepReadyProject({
@ -423,6 +431,32 @@ describe('runKtxPublicIngest', () => {
);
});
it('emits debug telemetry for ingest targets and project snapshots without project paths', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const projectDir = await mkdtemp(join(tmpdir(), 'ktx-public-ingest-telemetry-'));
try {
await initKtxProject({ projectDir });
const io = makeIo({ isTTY: true });
const project = projectWithConnections({
warehouse: { driver: 'sqlite', path: join(projectDir, 'warehouse.sqlite') },
});
const code = await runKtxPublicIngest(
{ command: 'run', projectDir, targetConnectionId: 'warehouse', all: false, json: false, inputMode: 'disabled' },
io.io,
{ loadProject: vi.fn(async () => project), runScan: vi.fn(async () => 0) },
);
expect(code).toBe(0);
expect(io.stderr()).toContain('"event":"ingest_completed"');
expect(io.stderr()).toContain('"event":"project_stack_snapshot"');
expect(io.stderr()).not.toContain(projectDir);
} finally {
await rm(projectDir, { recursive: true, force: true });
}
});
it('runs query history after schema ingest with current-run window override', async () => {
const io = makeIo();
const runtimeIo = makeIo({ isTTY: true });

View file

@ -21,6 +21,8 @@ import { publicIngestOutputLine } from './public-ingest-copy.js';
import { resolvePublicIngestRuntimeRequirements } from './runtime-requirements.js';
import type { KtxScanArgs, KtxScanDeps } from './scan.js';
import { profileMark } from './startup-profile.js';
import { isDemoConnection } from './telemetry/demo-detect.js';
import { emitProjectStackSnapshot, emitTelemetryEvent } from './telemetry/index.js';
profileMark('module:public-ingest');
@ -603,6 +605,39 @@ function resultFailed(result: KtxPublicIngestTargetResult): boolean {
return result.steps.some((step) => step.status === 'failed');
}
function rowsBucket(): '<10k' | '<100k' | '<1M' | '<10M' | '>=10M' {
return '<10k';
}
async function emitIngestCompleted(input: {
args: Extract<KtxPublicIngestArgs, { command: 'run' }>;
project: KtxPublicIngestProject;
target: KtxPublicIngestPlanTarget;
result: KtxPublicIngestTargetResult;
startedAt: number;
io: KtxCliIo;
}): Promise<void> {
const failed = resultFailed(input.result);
await emitTelemetryEvent({
name: 'ingest_completed',
projectDir: input.args.projectDir,
io: input.io,
fields: {
driver: input.target.driver,
isDemoConnection: isDemoConnection(
input.target.connectionId,
input.project.config.connections[input.target.connectionId],
),
schemaCount: 0,
tableCount: 0,
columnCount: 0,
rowsBucket: rowsBucket(),
durationMs: Math.max(0, performance.now() - input.startedAt),
outcome: failed ? 'error' : 'ok',
},
});
}
function stepStatus(result: KtxPublicIngestTargetResult, operation: KtxPublicIngestStepName): string {
return result.steps.find((step) => step.operation === operation)?.status ?? 'not-run';
}
@ -928,7 +963,10 @@ export async function runKtxPublicIngest(
}
for (const target of plan.targets) {
results.push(await executePublicIngestTarget(target, args, io, deps));
const startedAt = performance.now();
const result = await executePublicIngestTarget(target, args, io, deps);
results.push(result);
await emitIngestCompleted({ args, project, target, result, startedAt, io });
}
if (args.json) {
@ -937,5 +975,7 @@ export async function runKtxPublicIngest(
renderPlainResults(results, io);
}
await emitProjectStackSnapshot({ projectDir: args.projectDir, io });
return results.some(resultFailed) ? 1 : 0;
}

View file

@ -317,6 +317,7 @@ describe('runKtxScan', () => {
});
afterEach(async () => {
vi.unstubAllEnvs();
await rm(tempDir, { recursive: true, force: true });
});
@ -381,6 +382,44 @@ describe('runKtxScan', () => {
expect(io.stdout()).not.toContain('/~');
});
it('emits debug telemetry for completed scans without project paths', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
await initKtxProject({ projectDir: tempDir });
const runLocalScan = vi.fn(
async (): Promise<LocalScanRunResult> => ({
runId: 'scan-run-1',
status: 'done',
done: true,
connectionId: 'warehouse',
mode: 'structural',
dryRun: false,
syncId: 'sync-1',
report,
}),
);
const io = makeIo({ isTTY: true });
const code = await runKtxScan(
{
command: 'run',
projectDir: tempDir,
connectionId: 'warehouse',
mode: 'structural',
detectRelationships: false,
dryRun: false,
databaseIntrospectionUrl: 'http://127.0.0.1:8765',
},
io.io,
{ runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters },
);
expect(code).toBe(0);
expect(io.stderr()).toContain('"event":"scan_completed"');
expect(io.stderr()).toContain('"tableCount"');
expect(io.stderr()).not.toContain(tempDir);
});
it('passes KTX daemon options to local ingest adapters when no explicit daemon URL is set', async () => {
await initKtxProject({ projectDir: tempDir });
const createLocalIngestAdapters = vi.fn(() => []);

View file

@ -8,6 +8,8 @@ import { createKtxCliLocalIngestAdapters } from './local-adapters.js';
import { createKtxCliScanConnector } from './local-scan-connectors.js';
import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js';
import { profileMark } from './startup-profile.js';
import { emitTelemetryEvent } from './telemetry/index.js';
import { scrubErrorClass } from './telemetry/scrubber.js';
profileMark('module:scan');
@ -62,6 +64,14 @@ function totalTableCount(report: KtxScanReport): number {
return tableChangeCount(report) + report.diffSummary.tablesUnchanged;
}
function scanColumnCount(report: KtxScanReport): number {
return report.structuralSyncStats.columnsCreated + report.structuralSyncStats.columnsUpdated;
}
function inferredFkCount(report: KtxScanReport): number {
return report.relationships.accepted + report.relationships.review + report.relationships.rejected;
}
function writeScanIdentity(report: KtxScanReport, io: KtxCliIo): void {
io.stdout.write(`Run: ${report.runId}\n`);
io.stdout.write(`Connection: ${report.connectionId}\n`);
@ -311,6 +321,7 @@ export function createCliScanProgress(
}
export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps: KtxScanDeps = {}): Promise<number> {
const startedAt = performance.now();
try {
const project = await loadKtxProject({ projectDir: args.projectDir });
const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider;
@ -347,12 +358,42 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps
...(progress ? { progress } : {}),
});
cliProgress?.flush();
await emitTelemetryEvent({
name: 'scan_completed',
projectDir: args.projectDir,
io,
fields: {
driver: result.report.driver,
tableCount: totalTableCount(result.report),
columnCount: scanColumnCount(result.report),
inferredFkCount: inferredFkCount(result.report),
declaredFkCount: 0,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'ok',
},
});
writeRunSummary(result.report, args.projectDir, io);
} finally {
cliProgress?.flush();
}
return 0;
} catch (error) {
const errorClass = scrubErrorClass(error);
await emitTelemetryEvent({
name: 'scan_completed',
projectDir: args.projectDir,
io,
fields: {
driver: 'unknown',
tableCount: 0,
columnCount: 0,
inferredFkCount: 0,
declaredFkCount: 0,
durationMs: Math.max(0, performance.now() - startedAt),
outcome: 'error',
...(errorClass ? { errorClass } : {}),
},
});
io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`);
return 1;
}

View file

@ -101,7 +101,12 @@ export async function emitProjectStackSnapshot(input: {
}
emittedProjectSnapshots.add(input.projectDir);
const project = await loadKtxProject({ projectDir: input.projectDir });
let project: Awaited<ReturnType<typeof loadKtxProject>>;
try {
project = await loadKtxProject({ projectDir: input.projectDir });
} catch {
return;
}
await emitTelemetryEvent({
name: 'project_stack_snapshot',
fields: await buildProjectStackSnapshotFields(project),