diff --git a/packages/cli/src/context-build-view.ts b/packages/cli/src/context-build-view.ts index f088097d..0ddd4922 100644 --- a/packages/cli/src/context-build-view.ts +++ b/packages/cli/src/context-build-view.ts @@ -997,7 +997,7 @@ export async function runContextBuild( let result: KtxPublicIngestTargetResult | null = null; let thrownError: unknown = null; try { - result = await execTarget(targetState.target, runArgs, capture.io, progressDeps); + result = await execTarget(targetState.target, runArgs, capture.io, progressDeps, project); } catch (error) { thrownError = error; } diff --git a/packages/cli/src/public-ingest.ts b/packages/cli/src/public-ingest.ts index 216d1d7b..7fc43ac4 100644 --- a/packages/cli/src/public-ingest.ts +++ b/packages/cli/src/public-ingest.ts @@ -862,11 +862,34 @@ function capturedFailureMessage(output: string): string | undefined { return [firstLine, ...followupLines].join('\n'); } +/** + * Run one ingest target through its scan/ingest steps. The single per-target + * chokepoint reached by every entrypoint — standalone `ktx ingest` (plain/json + * and foreground) and `ktx setup` (via `runContextBuild`). The exported + * `executePublicIngestTarget` wraps this and emits the `ingest_completed` + * telemetry event exactly once, so every path is counted. + */ export async function executePublicIngestTarget( target: KtxPublicIngestPlanTarget, args: Extract, io: KtxCliIo, deps: KtxPublicIngestDeps, + project: KtxPublicIngestProject, +): Promise { + const startedAt = performance.now(); + const result = await runIngestTargetSteps(target, args, io, deps); + // `io` may be a capture buffer for the scan/ingest step output; the telemetry + // debug echo belongs on the real user-facing stream, which callers expose as + // `deps.runtimeIo` (falling back to `io` when the step io is already real). + await emitIngestCompleted({ args, project, target, result, startedAt, io: deps.runtimeIo ?? io }); + return result; +} + +async function runIngestTargetSteps( + target: KtxPublicIngestPlanTarget, + args: Extract, + io: KtxCliIo, + deps: KtxPublicIngestDeps, ): Promise { if (target.preflightFailure) { if (target.operation === 'database-ingest') { @@ -1086,11 +1109,8 @@ export async function runKtxPublicIngest( } for (const [index, target] of plan.targets.entries()) { - const startedAt = performance.now(); if (args.json) { - const result = await executePublicIngestTarget(target, args, io, deps); - results.push(result); - await emitIngestCompleted({ args, project, target, result, startedAt, io }); + results.push(await executePublicIngestTarget(target, args, io, deps, project)); continue; } @@ -1108,9 +1128,7 @@ export async function runKtxPublicIngest( onPhaseEnd: progress.onPhaseEnd, runtimeIo: deps.runtimeIo ?? io, }; - const result = await executePublicIngestTarget(target, args, capture, targetDeps); - results.push(result); - await emitIngestCompleted({ args, project, target, result, startedAt, io }); + results.push(await executePublicIngestTarget(target, args, capture, targetDeps, project)); } if (args.json) { diff --git a/packages/cli/test/context-build-view.test.ts b/packages/cli/test/context-build-view.test.ts index 40e33606..d8692eb5 100644 --- a/packages/cli/test/context-build-view.test.ts +++ b/packages/cli/test/context-build-view.test.ts @@ -984,6 +984,7 @@ describe('runContextBuild', () => { scanProgress: expect.anything(), ingestProgress: expect.any(Function), }), + project, ); }); @@ -1015,6 +1016,7 @@ describe('runContextBuild', () => { expect.objectContaining({ runtimeIo: io.io, }), + project, ); }); diff --git a/packages/cli/test/public-ingest.test.ts b/packages/cli/test/public-ingest.test.ts index 549756eb..2c27593e 100644 --- a/packages/cli/test/public-ingest.test.ts +++ b/packages/cli/test/public-ingest.test.ts @@ -6,11 +6,17 @@ import { initKtxProject } from '../src/context/project/project.js'; import { afterEach, describe, expect, it, vi } from 'vitest'; import { buildPublicIngestPlan, + executePublicIngestTarget, type KtxPublicIngestDeps, type KtxPublicIngestProject, publicProgressMessage, runKtxPublicIngest, } from '../src/public-ingest.js'; + +/** Count non-overlapping occurrences of `needle` in `haystack`. */ +function occurrences(haystack: string, needle: string): number { + return haystack.split(needle).length - 1; +} import type { ManagedPythonCommandRuntime } from '../src/managed-python-command.js'; function makeIo(options: { isTTY?: boolean; interactive?: boolean } = {}) { @@ -457,6 +463,58 @@ describe('runKtxPublicIngest', () => { } }); + it('emits exactly one ingest_completed from the shared executePublicIngestTarget chokepoint', async () => { + // executePublicIngestTarget is the single per-target path reached by every + // entrypoint (plain/json ingest, foreground ingest via runContextBuild, and + // setup). Emitting here is what makes ingest_completed fire on every path. + vi.stubEnv('KTX_TELEMETRY_DEBUG', '1'); + vi.stubEnv('CI', ''); + const io = makeIo({ isTTY: true }); + const project = deepReadyProject({ warehouse: { driver: 'postgres' } }); + const [target] = buildPublicIngestPlan(project, { + projectDir: '/tmp/project', + targetConnectionId: 'warehouse', + all: false, + }).targets; + + const result = await executePublicIngestTarget( + target, + { command: 'run', projectDir: '/tmp/project', targetConnectionId: 'warehouse', all: false, json: false, inputMode: 'disabled' }, + io.io, + { runScan: vi.fn(async () => 0) }, + project, + ); + + expect(result.steps.some((step) => step.status === 'failed')).toBe(false); + expect(occurrences(io.stderr(), '"event":"ingest_completed"')).toBe(1); + expect(io.stderr()).toContain('"outcome":"ok"'); + }); + + it('emits one ingest_completed per target and never double-emits across a multi-target run', async () => { + vi.stubEnv('KTX_TELEMETRY_DEBUG', '1'); + vi.stubEnv('CI', ''); + const projectDir = await mkdtemp(join(tmpdir(), 'ktx-public-ingest-no-double-')); + try { + await initKtxProject({ projectDir }); + const io = makeIo({ isTTY: true }); + const project = deepReadyProject({ + first: { driver: 'sqlite', path: join(projectDir, 'first.sqlite') }, + second: { driver: 'sqlite', path: join(projectDir, 'second.sqlite') }, + }); + + const code = await runKtxPublicIngest( + { command: 'run', projectDir, all: true, json: false, inputMode: 'disabled' }, + io.io, + { loadProject: vi.fn(async () => project), runScan: vi.fn(async () => 0) }, + ); + + expect(code).toBe(0); + expect(occurrences(io.stderr(), '"event":"ingest_completed"')).toBe(2); + } finally { + await rm(projectDir, { recursive: true, force: true }); + } + }); + it('runs query history after schema ingest with current-run window override', async () => { const io = makeIo(); const runtimeIo = makeIo({ isTTY: true });