diff --git a/packages/cli/src/public-ingest.test.ts b/packages/cli/src/public-ingest.test.ts index ac2560cd..1f5dd67d 100644 --- a/packages/cli/src/public-ingest.test.ts +++ b/packages/cli/src/public-ingest.test.ts @@ -1,5 +1,9 @@ +import { mkdtemp, rm } from 'node:fs/promises'; +import { tmpdir } from 'node:os'; +import { join } from 'node:path'; import { buildDefaultKtxProjectConfig, type KtxProjectConfig } from './context/project/config.js'; -import { describe, expect, it, vi } from 'vitest'; +import { initKtxProject } from './context/project/project.js'; +import { afterEach, describe, expect, it, vi } from 'vitest'; import { buildPublicIngestPlan, type KtxPublicIngestDeps, @@ -395,6 +399,10 @@ describe('buildPublicIngestPlan', () => { }); describe('runKtxPublicIngest', () => { + afterEach(() => { + vi.unstubAllEnvs(); + }); + it('maps fast and deep database targets to scan internals', async () => { const io = makeIo(); const project = deepReadyProject({ @@ -423,6 +431,32 @@ describe('runKtxPublicIngest', () => { ); }); + it('emits debug telemetry for ingest targets and project snapshots without project paths', async () => { + vi.stubEnv('KTX_TELEMETRY_DEBUG', '1'); + vi.stubEnv('CI', ''); + const projectDir = await mkdtemp(join(tmpdir(), 'ktx-public-ingest-telemetry-')); + try { + await initKtxProject({ projectDir }); + const io = makeIo({ isTTY: true }); + const project = projectWithConnections({ + warehouse: { driver: 'sqlite', path: join(projectDir, 'warehouse.sqlite') }, + }); + + const code = await runKtxPublicIngest( + { command: 'run', projectDir, targetConnectionId: 'warehouse', all: false, json: false, inputMode: 'disabled' }, + io.io, + { loadProject: vi.fn(async () => project), runScan: vi.fn(async () => 0) }, + ); + + expect(code).toBe(0); + expect(io.stderr()).toContain('"event":"ingest_completed"'); + expect(io.stderr()).toContain('"event":"project_stack_snapshot"'); + expect(io.stderr()).not.toContain(projectDir); + } finally { + await rm(projectDir, { recursive: true, force: true }); + } + }); + it('runs query history after schema ingest with current-run window override', async () => { const io = makeIo(); const runtimeIo = makeIo({ isTTY: true }); diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 60b9622c..498edb3a 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -21,6 +21,8 @@ import { publicIngestOutputLine } from './public-ingest-copy.js'; import { resolvePublicIngestRuntimeRequirements } from './runtime-requirements.js'; import type { KtxScanArgs, KtxScanDeps } from './scan.js'; import { profileMark } from './startup-profile.js'; +import { isDemoConnection } from './telemetry/demo-detect.js'; +import { emitProjectStackSnapshot, emitTelemetryEvent } from './telemetry/index.js'; profileMark('module:public-ingest'); @@ -603,6 +605,39 @@ function resultFailed(result: KtxPublicIngestTargetResult): boolean { return result.steps.some((step) => step.status === 'failed'); } +function rowsBucket(): '<10k' | '<100k' | '<1M' | '<10M' | '>=10M' { + return '<10k'; +} + +async function emitIngestCompleted(input: { + args: Extract; + project: KtxPublicIngestProject; + target: KtxPublicIngestPlanTarget; + result: KtxPublicIngestTargetResult; + startedAt: number; + io: KtxCliIo; +}): Promise { + const failed = resultFailed(input.result); + await emitTelemetryEvent({ + name: 'ingest_completed', + projectDir: input.args.projectDir, + io: input.io, + fields: { + driver: input.target.driver, + isDemoConnection: isDemoConnection( + input.target.connectionId, + input.project.config.connections[input.target.connectionId], + ), + schemaCount: 0, + tableCount: 0, + columnCount: 0, + rowsBucket: rowsBucket(), + durationMs: Math.max(0, performance.now() - input.startedAt), + outcome: failed ? 'error' : 'ok', + }, + }); +} + function stepStatus(result: KtxPublicIngestTargetResult, operation: KtxPublicIngestStepName): string { return result.steps.find((step) => step.operation === operation)?.status ?? 'not-run'; } @@ -928,7 +963,10 @@ export async function runKtxPublicIngest( } for (const target of plan.targets) { - results.push(await executePublicIngestTarget(target, args, io, deps)); + const startedAt = performance.now(); + const result = await executePublicIngestTarget(target, args, io, deps); + results.push(result); + await emitIngestCompleted({ args, project, target, result, startedAt, io }); } if (args.json) { @@ -937,5 +975,7 @@ export async function runKtxPublicIngest( renderPlainResults(results, io); } + await emitProjectStackSnapshot({ projectDir: args.projectDir, io }); + return results.some(resultFailed) ? 1 : 0; } diff --git a/packages/cli/src/scan.test.ts b/packages/cli/src/scan.test.ts index 0d2bcdc9..16cfdbd3 100644 --- a/packages/cli/src/scan.test.ts +++ b/packages/cli/src/scan.test.ts @@ -317,6 +317,7 @@ describe('runKtxScan', () => { }); afterEach(async () => { + vi.unstubAllEnvs(); await rm(tempDir, { recursive: true, force: true }); }); @@ -381,6 +382,44 @@ describe('runKtxScan', () => { expect(io.stdout()).not.toContain('/~'); }); + it('emits debug telemetry for completed scans without project paths', async () => { + vi.stubEnv('KTX_TELEMETRY_DEBUG', '1'); + vi.stubEnv('CI', ''); + await initKtxProject({ projectDir: tempDir }); + const runLocalScan = vi.fn( + async (): Promise => ({ + runId: 'scan-run-1', + status: 'done', + done: true, + connectionId: 'warehouse', + mode: 'structural', + dryRun: false, + syncId: 'sync-1', + report, + }), + ); + const io = makeIo({ isTTY: true }); + + const code = await runKtxScan( + { + command: 'run', + projectDir: tempDir, + connectionId: 'warehouse', + mode: 'structural', + detectRelationships: false, + dryRun: false, + databaseIntrospectionUrl: 'http://127.0.0.1:8765', + }, + io.io, + { runLocalScan, createLocalIngestAdapters: noLocalIngestAdapters }, + ); + + expect(code).toBe(0); + expect(io.stderr()).toContain('"event":"scan_completed"'); + expect(io.stderr()).toContain('"tableCount"'); + expect(io.stderr()).not.toContain(tempDir); + }); + it('passes KTX daemon options to local ingest adapters when no explicit daemon URL is set', async () => { await initKtxProject({ projectDir: tempDir }); const createLocalIngestAdapters = vi.fn(() => []); diff --git a/packages/cli/src/scan.ts b/packages/cli/src/scan.ts index a92aaa62..f40da497 100644 --- a/packages/cli/src/scan.ts +++ b/packages/cli/src/scan.ts @@ -8,6 +8,8 @@ import { createKtxCliLocalIngestAdapters } from './local-adapters.js'; import { createKtxCliScanConnector } from './local-scan-connectors.js'; import type { KtxManagedPythonInstallPolicy } from './managed-python-command.js'; import { profileMark } from './startup-profile.js'; +import { emitTelemetryEvent } from './telemetry/index.js'; +import { scrubErrorClass } from './telemetry/scrubber.js'; profileMark('module:scan'); @@ -62,6 +64,14 @@ function totalTableCount(report: KtxScanReport): number { return tableChangeCount(report) + report.diffSummary.tablesUnchanged; } +function scanColumnCount(report: KtxScanReport): number { + return report.structuralSyncStats.columnsCreated + report.structuralSyncStats.columnsUpdated; +} + +function inferredFkCount(report: KtxScanReport): number { + return report.relationships.accepted + report.relationships.review + report.relationships.rejected; +} + function writeScanIdentity(report: KtxScanReport, io: KtxCliIo): void { io.stdout.write(`Run: ${report.runId}\n`); io.stdout.write(`Connection: ${report.connectionId}\n`); @@ -311,6 +321,7 @@ export function createCliScanProgress( } export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps: KtxScanDeps = {}): Promise { + const startedAt = performance.now(); try { const project = await loadKtxProject({ projectDir: args.projectDir }); const resolveEmbeddingProvider = deps.resolveEmbeddingProvider ?? resolveProjectEmbeddingProvider; @@ -347,12 +358,42 @@ export async function runKtxScan(args: KtxScanArgs, io: KtxCliIo = process, deps ...(progress ? { progress } : {}), }); cliProgress?.flush(); + await emitTelemetryEvent({ + name: 'scan_completed', + projectDir: args.projectDir, + io, + fields: { + driver: result.report.driver, + tableCount: totalTableCount(result.report), + columnCount: scanColumnCount(result.report), + inferredFkCount: inferredFkCount(result.report), + declaredFkCount: 0, + durationMs: Math.max(0, performance.now() - startedAt), + outcome: 'ok', + }, + }); writeRunSummary(result.report, args.projectDir, io); } finally { cliProgress?.flush(); } return 0; } catch (error) { + const errorClass = scrubErrorClass(error); + await emitTelemetryEvent({ + name: 'scan_completed', + projectDir: args.projectDir, + io, + fields: { + driver: 'unknown', + tableCount: 0, + columnCount: 0, + inferredFkCount: 0, + declaredFkCount: 0, + durationMs: Math.max(0, performance.now() - startedAt), + outcome: 'error', + ...(errorClass ? { errorClass } : {}), + }, + }); io.stderr.write(`${error instanceof Error ? error.message : String(error)}\n`); return 1; } diff --git a/packages/cli/src/telemetry/index.ts b/packages/cli/src/telemetry/index.ts index c4d7953f..2e6e2cf9 100644 --- a/packages/cli/src/telemetry/index.ts +++ b/packages/cli/src/telemetry/index.ts @@ -101,7 +101,12 @@ export async function emitProjectStackSnapshot(input: { } emittedProjectSnapshots.add(input.projectDir); - const project = await loadKtxProject({ projectDir: input.projectDir }); + let project: Awaited>; + try { + project = await loadKtxProject({ projectDir: input.projectDir }); + } catch { + return; + } await emitTelemetryEvent({ name: 'project_stack_snapshot', fields: await buildProjectStackSnapshotFields(project),