Emit ingest_completed once per target on every ingest path

emitIngestCompleted was called only in runKtxPublicIngest's plain/json loop,
so the foreground 'ktx ingest' view and all of 'ktx setup' — which delegate to
runContextBuild -> executePublicIngestTarget — never emitted the event. That
left ingest_completed near-useless for measuring ingestion.

Move the emit into executePublicIngestTarget, the single per-target chokepoint
every entrypoint funnels through: a thin wrapper now captures timing, runs the
existing steps (extracted to runIngestTargetSteps), and emits exactly once. The
telemetry echo targets deps.runtimeIo (the real user stream) so a capture
buffer used for step output doesn't swallow it. Thread project through the
context-build call site. No schema/field changes, so Node<->Python telemetry
parity is unaffected.

Add tests: the shared chokepoint emits exactly one ingest_completed for any
caller, and a multi-target run emits one per target with no double-emit.
This commit is contained in:
Andrey Avtomonov 2026-06-02 20:03:27 +02:00
parent 6da8c3452a
commit 2334a4b6e3
4 changed files with 86 additions and 8 deletions

View file

@ -997,7 +997,7 @@ export async function runContextBuild(
let result: KtxPublicIngestTargetResult | null = null;
let thrownError: unknown = null;
try {
result = await execTarget(targetState.target, runArgs, capture.io, progressDeps);
result = await execTarget(targetState.target, runArgs, capture.io, progressDeps, project);
} catch (error) {
thrownError = error;
}

View file

@ -862,11 +862,34 @@ function capturedFailureMessage(output: string): string | undefined {
return [firstLine, ...followupLines].join('\n');
}
/**
* Run one ingest target through its scan/ingest steps. The single per-target
* chokepoint reached by every entrypoint standalone `ktx ingest` (plain/json
* and foreground) and `ktx setup` (via `runContextBuild`). The exported
* `executePublicIngestTarget` wraps this and emits the `ingest_completed`
* telemetry event exactly once, so every path is counted.
*/
export async function executePublicIngestTarget(
target: KtxPublicIngestPlanTarget,
args: Extract<KtxPublicIngestArgs, { command: 'run' }>,
io: KtxCliIo,
deps: KtxPublicIngestDeps,
project: KtxPublicIngestProject,
): Promise<KtxPublicIngestTargetResult> {
const startedAt = performance.now();
const result = await runIngestTargetSteps(target, args, io, deps);
// `io` may be a capture buffer for the scan/ingest step output; the telemetry
// debug echo belongs on the real user-facing stream, which callers expose as
// `deps.runtimeIo` (falling back to `io` when the step io is already real).
await emitIngestCompleted({ args, project, target, result, startedAt, io: deps.runtimeIo ?? io });
return result;
}
async function runIngestTargetSteps(
target: KtxPublicIngestPlanTarget,
args: Extract<KtxPublicIngestArgs, { command: 'run' }>,
io: KtxCliIo,
deps: KtxPublicIngestDeps,
): Promise<KtxPublicIngestTargetResult> {
if (target.preflightFailure) {
if (target.operation === 'database-ingest') {
@ -1086,11 +1109,8 @@ export async function runKtxPublicIngest(
}
for (const [index, target] of plan.targets.entries()) {
const startedAt = performance.now();
if (args.json) {
const result = await executePublicIngestTarget(target, args, io, deps);
results.push(result);
await emitIngestCompleted({ args, project, target, result, startedAt, io });
results.push(await executePublicIngestTarget(target, args, io, deps, project));
continue;
}
@ -1108,9 +1128,7 @@ export async function runKtxPublicIngest(
onPhaseEnd: progress.onPhaseEnd,
runtimeIo: deps.runtimeIo ?? io,
};
const result = await executePublicIngestTarget(target, args, capture, targetDeps);
results.push(result);
await emitIngestCompleted({ args, project, target, result, startedAt, io });
results.push(await executePublicIngestTarget(target, args, capture, targetDeps, project));
}
if (args.json) {

View file

@ -984,6 +984,7 @@ describe('runContextBuild', () => {
scanProgress: expect.anything(),
ingestProgress: expect.any(Function),
}),
project,
);
});
@ -1015,6 +1016,7 @@ describe('runContextBuild', () => {
expect.objectContaining({
runtimeIo: io.io,
}),
project,
);
});

View file

@ -6,11 +6,17 @@ import { initKtxProject } from '../src/context/project/project.js';
import { afterEach, describe, expect, it, vi } from 'vitest';
import {
buildPublicIngestPlan,
executePublicIngestTarget,
type KtxPublicIngestDeps,
type KtxPublicIngestProject,
publicProgressMessage,
runKtxPublicIngest,
} from '../src/public-ingest.js';
/** Count non-overlapping occurrences of `needle` in `haystack`. */
function occurrences(haystack: string, needle: string): number {
return haystack.split(needle).length - 1;
}
import type { ManagedPythonCommandRuntime } from '../src/managed-python-command.js';
function makeIo(options: { isTTY?: boolean; interactive?: boolean } = {}) {
@ -457,6 +463,58 @@ describe('runKtxPublicIngest', () => {
}
});
it('emits exactly one ingest_completed from the shared executePublicIngestTarget chokepoint', async () => {
// executePublicIngestTarget is the single per-target path reached by every
// entrypoint (plain/json ingest, foreground ingest via runContextBuild, and
// setup). Emitting here is what makes ingest_completed fire on every path.
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const io = makeIo({ isTTY: true });
const project = deepReadyProject({ warehouse: { driver: 'postgres' } });
const [target] = buildPublicIngestPlan(project, {
projectDir: '/tmp/project',
targetConnectionId: 'warehouse',
all: false,
}).targets;
const result = await executePublicIngestTarget(
target,
{ command: 'run', projectDir: '/tmp/project', targetConnectionId: 'warehouse', all: false, json: false, inputMode: 'disabled' },
io.io,
{ runScan: vi.fn(async () => 0) },
project,
);
expect(result.steps.some((step) => step.status === 'failed')).toBe(false);
expect(occurrences(io.stderr(), '"event":"ingest_completed"')).toBe(1);
expect(io.stderr()).toContain('"outcome":"ok"');
});
it('emits one ingest_completed per target and never double-emits across a multi-target run', async () => {
vi.stubEnv('KTX_TELEMETRY_DEBUG', '1');
vi.stubEnv('CI', '');
const projectDir = await mkdtemp(join(tmpdir(), 'ktx-public-ingest-no-double-'));
try {
await initKtxProject({ projectDir });
const io = makeIo({ isTTY: true });
const project = deepReadyProject({
first: { driver: 'sqlite', path: join(projectDir, 'first.sqlite') },
second: { driver: 'sqlite', path: join(projectDir, 'second.sqlite') },
});
const code = await runKtxPublicIngest(
{ command: 'run', projectDir, all: true, json: false, inputMode: 'disabled' },
io.io,
{ loadProject: vi.fn(async () => project), runScan: vi.fn(async () => 0) },
);
expect(code).toBe(0);
expect(occurrences(io.stderr(), '"event":"ingest_completed"')).toBe(2);
} finally {
await rm(projectDir, { recursive: true, force: true });
}
});
it('runs query history after schema ingest with current-run window override', async () => {
const io = makeIo();
const runtimeIo = makeIo({ isTTY: true });