feat(cli): profile ingest runs and split model vs tool time (#249)

* feat(cli): profile ingest runs to find where wall-clock time goes

Add opt-in profiling for `ktx ingest`. Each timed phase, work unit, and
agent loop now records durationMs / step count / token usage in the
trace, and a post-run aggregator rolls them up into a "where did the
time go" report printed to stderr.

Enable per run with KTX_PROFILE_INGEST (1/true -> human table, json ->
raw structured profile) or persistently via `ingest.profile` in
ktx.yaml. The json form emits raw milliseconds, token counts, and a
summary.headline one-line diagnosis so coding agents can parse it
directly; json wins when both env and config request profiling.

- runtime-port: RunLoopMetrics (totalMs, usage, stepCount,
  stepBoundariesMs) plus onMetrics callbacks on text/object generation
- ai-sdk + claude-code runtimes: capture per-loop timing and token usage
- work-unit-executor and stages 3/4: thread metrics into trace events
- ingest-bundle.runner: time worktree / triage / clustering / index /
  reconcile / squash phases and emit the profile in a finally block
  (best-effort; never affects the run outcome)
- ingest-profile: new trace+transcript aggregator with table/json formatters
- config: ingest.profile flag; docs: profiling section in ktx-ingest.mdx

* fix(cli): flush tool-call logs before reading ingest profile

Tool transcripts are appended fire-and-forget so the agent hot path never
blocks on logging. The ingest profiler read them before the writes settled,
so per-work-unit toolMs (and the model-vs-tool split derived from it) could
be incomplete. Track in-flight appends and expose flushToolCallLogs() —
bounded by a timeout so it can never hang — and flush before the profiler
reads the transcript.
This commit is contained in:
Andrey Avtomonov 2026-06-01 15:49:17 +02:00 committed by GitHub
parent 22ddf5524c
commit 21744fc520
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1243 additions and 56 deletions

View file

@ -25,6 +25,7 @@ import {
deriveFinalizationWikiPageKeys,
} from './finalization-scope.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { formatIngestProfile, formatIngestProfileJson, readIngestProfile, resolveIngestProfileMode } from './ingest-profile.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js';
import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js';
@ -69,7 +70,7 @@ import { createEvictionListTool } from './tools/eviction-list.tool.js';
import { createReadRawSpanTool } from './tools/read-raw-span.tool.js';
import { createStageDiffTool } from './tools/stage-diff.tool.js';
import { createStageListTool } from './tools/stage-list.tool.js';
import { type ToolCallLogEntry, wrapToolsWithLogger } from './tools/tool-call-logger.js';
import { flushToolCallLogs, type ToolCallLogEntry, wrapToolsWithLogger } from './tools/tool-call-logger.js';
import {
createMutableToolTranscriptSummary,
recordToolTranscriptEntry,
@ -239,6 +240,41 @@ export class IngestBundleRunner {
} catch (error) {
ctx?.memoryFlow?.finish('error', [sanitizeMemoryFlowError(error)]);
throw error;
} finally {
await this.maybeEmitIngestProfile(job.jobId);
}
}
/**
* When profiling is enabled via the `KTX_PROFILE_INGEST` env var or the
* `ingest.profile` config setting read the job's trace + tool transcripts
* and print a rolled-up timing breakdown to stderr. `json` emits the raw
* structured profile for coding agents; `table` emits a human summary.
* Best-effort: profiling never affects the run outcome.
*/
private async maybeEmitIngestProfile(jobId: string): Promise<void> {
const mode = resolveIngestProfileMode(this.deps.settings.profileIngest);
if (mode === 'off') {
return;
}
try {
// Tool transcripts are appended fire-and-forget; flush them so per-work-unit
// toolMs (and the derived model-vs-tool split) is complete before we read.
await flushToolCallLogs();
const storage = this.deps.storage as typeof this.deps.storage & {
resolveTracePath?: (jobId: string) => string;
};
const profile = await readIngestProfile(jobId, {
tracePath: storage.resolveTracePath?.(jobId) ?? ingestTracePathForJob(this.deps.storage.homeDir, jobId),
transcriptDir: this.deps.storage.resolveTranscriptDir(jobId),
});
process.stderr.write(`\n${mode === 'json' ? formatIngestProfileJson(profile) : formatIngestProfile(profile)}`);
} catch (error) {
this.logger.warn(
`[ingest-bundle] ingest profile unavailable for job=${jobId}: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
}
@ -1100,8 +1136,15 @@ export class IngestBundleRunner {
const scopeDescriptor = adapter.describeScope ? await adapter.describeScope(stagedDir) : null;
const sessionWorktree = await this.deps.lockingService.withLock('config:repo', () =>
this.deps.sessionWorktreeService.create(job.jobId, baseSha),
const sessionWorktree = await traceTimed(
trace,
'worktree',
'session_worktree_created',
{ jobId: job.jobId },
() =>
this.deps.lockingService.withLock('config:repo', () =>
this.deps.sessionWorktreeService.create(job.jobId, baseSha),
),
);
let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash';
@ -1272,26 +1315,34 @@ export class IngestBundleRunner {
sourceContextReport = chunk.contextReport;
parseArtifacts = chunk.parseArtifacts;
reconcileNotes = chunk.reconcileNotes ?? [];
const pageTriage = this.deps.pageTriage;
const triageRunId = runRow.id;
triageResult =
contextReport && adapter.triageSupported && this.deps.pageTriage
? await this.deps.pageTriage.triageRun({
stagedDir,
runId: runRow.id,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
diffSet,
adapter,
})
contextReport && adapter.triageSupported && pageTriage
? await traceTimed(runTrace, 'triage', 'page_triage', { sourceKey: job.sourceKey }, () =>
pageTriage.triageRun({
stagedDir,
runId: triageRunId,
connectionId: job.connectionId,
sourceKey: job.sourceKey,
syncId,
jobId: job.jobId,
diffSet,
adapter,
}),
)
: null;
workUnits = this.filterWorkUnitsForTriage(workUnits, triageResult);
if (adapter.clusterWorkUnits && workUnits.length > 0) {
workUnits = await adapter.clusterWorkUnits({
workUnits,
stagedDir,
embedding: this.deps.embedding,
});
const clusterWorkUnits = adapter.clusterWorkUnits;
if (clusterWorkUnits && workUnits.length > 0) {
const preClusterCount = workUnits.length;
workUnits = await traceTimed(
runTrace,
'clustering',
'cluster_work_units',
{ workUnitCount: preClusterCount },
() => clusterWorkUnits({ workUnits, stagedDir, embedding: this.deps.embedding }),
);
}
await stage2?.updateProgress(1.0, `Planned ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`);
}
@ -1326,7 +1377,13 @@ export class IngestBundleRunner {
});
// Build shared per-job context.
const [wikiIndex, slIndex] = await Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]);
const [wikiIndex, slIndex] = await traceTimed(
runTrace,
'index_build',
'build_indexes',
{ connectionCount: slConnectionIds.length },
() => Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]),
);
const baseFraming = await this.deps.promptService.loadPrompt('memory_agent_bundle_ingest_work_unit');
const wuSkillNames = Array.from(
@ -1881,6 +1938,8 @@ export class IngestBundleRunner {
let curatorWarnings: string[] = [];
let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>;
const reconcileStartedAt = Date.now();
const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single';
if (contextReport && this.deps.curatorPagination) {
const curatorOutcome = await this.deps.curatorPagination.reconcile({
runId: runRow.id,
@ -1989,6 +2048,33 @@ export class IngestBundleRunner {
: undefined,
});
}
await runTrace.event(
'debug',
'reconciliation',
'reconciliation_executed',
{
mode: reconcileMode,
skipped: reconcileOutcome.skipped,
...(reconcileOutcome.stopReason ? { stopReason: reconcileOutcome.stopReason } : {}),
...(reconcileOutcome.metrics
? {
agentLoopMs: reconcileOutcome.metrics.totalMs,
stepCount: reconcileOutcome.metrics.stepCount,
...(reconcileOutcome.metrics.usage.inputTokens !== undefined
? { inputTokens: reconcileOutcome.metrics.usage.inputTokens }
: {}),
...(reconcileOutcome.metrics.usage.outputTokens !== undefined
? { outputTokens: reconcileOutcome.metrics.usage.outputTokens }
: {}),
...(reconcileOutcome.metrics.usage.totalTokens !== undefined
? { totalTokens: reconcileOutcome.metrics.usage.totalTokens }
: {}),
}
: {}),
},
undefined,
Date.now() - reconcileStartedAt,
);
latestReconciliationSkipped = reconcileOutcome.skipped;
const danglingReconcileWikiRefs = await findDanglingWikiRefsForActions({
@ -2036,6 +2122,7 @@ export class IngestBundleRunner {
activePhase = 'finalization';
if (adapter.finalize) {
const stageFinalization = ctx?.startPhase(0.04);
const finalizationStartedAt = Date.now();
emitStageProgress('finalization', 87, 'Running deterministic finalization');
await stageFinalization?.updateProgress(0.0, 'Running deterministic finalization');
await runTrace.event('debug', 'finalization', 'finalization_started', { sourceKey: job.sourceKey });
@ -2215,14 +2302,21 @@ export class IngestBundleRunner {
latestFinalizationOutcome = finalizationOutcome;
emitStageProgress('finalization', 88, 'Deterministic finalization complete');
await stageFinalization?.updateProgress(1.0, 'Deterministic finalization complete');
await runTrace.event('debug', 'finalization', 'finalization_committed', {
sourceKey: job.sourceKey,
commitSha: finalizationSha,
touchedPaths: finalizationTouchedPaths,
touchedSources: finalizationTouchedSources,
changedWikiPageKeys: finalizationChangedWikiPageKeys,
warnings: result.warnings,
});
await runTrace.event(
'debug',
'finalization',
'finalization_committed',
{
sourceKey: job.sourceKey,
commitSha: finalizationSha,
touchedPaths: finalizationTouchedPaths,
touchedSources: finalizationTouchedSources,
changedWikiPageKeys: finalizationChangedWikiPageKeys,
warnings: result.warnings,
},
undefined,
Date.now() - finalizationStartedAt,
);
} else {
await runTrace.event('debug', 'finalization', 'finalization_skipped', { sourceKey: job.sourceKey });
}
@ -2504,6 +2598,7 @@ export class IngestBundleRunner {
const stage6 = ctx?.startPhase(0.04);
emitStageProgress('save', 91, 'Saving changes');
await stage6?.updateProgress(0.0, 'Saving changes');
const squashStartedAt = Date.now();
try {
await sessionWorktree.git.assertWorktreeClean();
} catch (error) {
@ -2527,10 +2622,17 @@ export class IngestBundleRunner {
throw new Error(`squash merge conflict: ${mergeResult.conflictPaths.join(', ')}`);
}
const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha;
await runTrace.event('debug', 'squash', 'squash_finished', {
commitSha,
touchedPaths: mergeResult.touchedPaths,
});
await runTrace.event(
'debug',
'squash',
'squash_finished',
{
commitSha,
touchedPaths: mergeResult.touchedPaths,
},
undefined,
Date.now() - squashStartedAt,
);
const memoryFlowSavedActions = stageIndex.workUnits
.flatMap((wu) => wu.actions)
.concat(reconcileActions)
@ -2547,6 +2649,7 @@ export class IngestBundleRunner {
// transaction. If this throws, the run fails and no partial index state
// survives (thanks to the transactional upsert in applyDiffTransactional).
if (commitSha) {
const indexSyncStartedAt = Date.now();
// Multi-file squash → omit path so the handler diffs the whole commit
// (a comma-joined pathspec would match nothing and the job would no-op).
const pathFilter = mergeResult.touchedPaths.length === 1 ? mergeResult.touchedPaths[0] : '';
@ -2571,6 +2674,14 @@ export class IngestBundleRunner {
);
}
}
await runTrace.event(
'debug',
'index_sync',
'post_squash_index_sync_finished',
{ connectionCount: touchedConnections.length },
undefined,
Date.now() - indexSyncStartedAt,
);
}
const stage5 = ctx?.startPhase(0.04);

View file

@ -0,0 +1,437 @@
import { readdir, readFile } from 'node:fs/promises';
import { join } from 'node:path';
import { z } from 'zod';
export interface IngestProfilePaths {
tracePath: string;
transcriptDir: string;
}
/**
* Post-processor over the ingest trace (`<home>/ingest-traces/<jobId>/trace.jsonl`)
* and per-work-unit tool transcripts. Turns the durations recorded during a run
* into a rolled-up "where did the time go" view. Gated for display by
* `KTX_PROFILE_INGEST`; the durations themselves are always written to the trace.
*/
const traceEventSchema = z
.object({
at: z.string().optional(),
phase: z.string(),
event: z.string(),
durationMs: z.number().optional(),
data: z.record(z.string(), z.unknown()).optional(),
})
.loose();
/** @internal */
export type ProfiledTraceEvent = z.infer<typeof traceEventSchema>;
export interface IngestProfile {
jobId: string;
totalWallMs?: number;
phases: Array<{
phase: string;
totalMs: number;
/** Number of timed (durationMs-bearing) events that contributed to this phase. */
count: number;
}>;
workUnits: Array<{
unitKey: string;
status?: string;
/** Wall-clock for the whole work-unit run (agent loop + validation + git). */
totalMs?: number;
/** Pure `generateText` agent-loop time reported by the runtime. */
agentLoopMs?: number;
/** Summed tool-execution time from the work-unit transcript. */
toolMs?: number;
/** Derived model "thinking" time = agentLoopMs - toolMs (clamped at 0). */
modelMs?: number;
/** Worktree create time. */
createMs?: number;
/** Worktree teardown time. */
cleanupMs?: number;
stepCount?: number;
totalTokens?: number;
}>;
workUnitCount: number;
failedWorkUnitCount: number;
/**
* Plain-language diagnosis plus the raw numbers behind it, so a reader (human
* or coding agent) gets the conclusion without re-deriving it from the tables.
*/
summary: {
/** One-sentence conclusion, e.g. which phase dominated and whether work was model- or tool-bound. */
headline: string;
dominantPhase?: { phase: string; totalMs: number; pctOfWall?: number };
/** Aggregate across all work units, in milliseconds. */
workUnits?: {
count: number;
failed: number;
agentLoopMs: number;
modelMs: number;
toolMs: number;
/** Percent of agent-loop time spent in model generation vs tool execution. */
modelPct?: number;
};
};
}
type IngestWorkUnitTiming = IngestProfile['workUnits'][number];
function asNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function asString(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined;
}
/** @internal */
export function parseTraceEvents(traceText: string): ProfiledTraceEvent[] {
const events: ProfiledTraceEvent[] = [];
for (const line of traceText.split('\n')) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
let json: unknown;
try {
json = JSON.parse(trimmed);
} catch {
continue;
}
const parsed = traceEventSchema.safeParse(json);
if (parsed.success) {
events.push(parsed.data);
}
}
return events;
}
/** @internal */
export function aggregateIngestProfile(input: {
jobId: string;
events: ProfiledTraceEvent[];
toolMsByUnit: Record<string, number>;
}): IngestProfile {
const { jobId, events, toolMsByUnit } = input;
const phaseTotals = new Map<string, { totalMs: number; count: number }>();
const workUnits = new Map<string, IngestWorkUnitTiming>();
const wu = (unitKey: string): IngestWorkUnitTiming => {
let existing = workUnits.get(unitKey);
if (!existing) {
existing = { unitKey };
workUnits.set(unitKey, existing);
}
return existing;
};
let minAt = Number.POSITIVE_INFINITY;
let maxAt = Number.NEGATIVE_INFINITY;
for (const event of events) {
const at = event.at ? Date.parse(event.at) : Number.NaN;
if (!Number.isNaN(at)) {
minAt = Math.min(minAt, at);
maxAt = Math.max(maxAt, at);
}
if (event.durationMs !== undefined) {
const bucket = phaseTotals.get(event.phase) ?? { totalMs: 0, count: 0 };
bucket.totalMs += event.durationMs;
bucket.count += 1;
phaseTotals.set(event.phase, bucket);
}
const data = event.data ?? {};
const unitKey = asString(data.unitKey);
if (unitKey) {
const entry = wu(unitKey);
if (event.event === 'work_unit_executed') {
entry.totalMs = event.durationMs;
entry.agentLoopMs = asNumber(data.agentLoopMs);
entry.stepCount = asNumber(data.stepCount);
entry.totalTokens = asNumber(data.totalTokens);
entry.status = asString(data.status) ?? entry.status;
} else if (event.event === 'work_unit_child_created') {
entry.createMs = event.durationMs;
} else if (event.event === 'work_unit_child_cleanup') {
entry.cleanupMs = event.durationMs;
} else if (event.event === 'work_unit_failed_before_patch') {
entry.status = entry.status ?? 'failed';
}
}
}
for (const [unitKey, entry] of workUnits) {
const toolMs = toolMsByUnit[unitKey];
if (toolMs !== undefined) {
entry.toolMs = toolMs;
if (entry.agentLoopMs !== undefined) {
entry.modelMs = Math.max(0, entry.agentLoopMs - toolMs);
}
} else if (entry.agentLoopMs !== undefined) {
entry.modelMs = entry.agentLoopMs;
}
}
const phases = [...phaseTotals.entries()]
.map(([phase, { totalMs, count }]) => ({ phase, totalMs, count }))
.sort((a, b) => b.totalMs - a.totalMs);
const workUnitList = [...workUnits.values()].sort((a, b) => (b.totalMs ?? 0) - (a.totalMs ?? 0));
const totalWallMs = Number.isFinite(minAt) && Number.isFinite(maxAt) && maxAt >= minAt ? maxAt - minAt : undefined;
const failedWorkUnitCount = workUnitList.filter((entry) => entry.status === 'failed').length;
return {
jobId,
...(totalWallMs !== undefined ? { totalWallMs } : {}),
phases,
workUnits: workUnitList,
workUnitCount: workUnitList.length,
failedWorkUnitCount,
summary: buildSummary(phases, workUnitList, failedWorkUnitCount, totalWallMs),
};
}
function buildSummary(
phases: IngestProfile['phases'],
workUnits: IngestWorkUnitTiming[],
failed: number,
totalWallMs: number | undefined,
): IngestProfile['summary'] {
const dominant = phases[0];
const dominantPhase = dominant
? {
phase: dominant.phase,
totalMs: dominant.totalMs,
...(totalWallMs && totalWallMs > 0
? { pctOfWall: Math.round((dominant.totalMs / totalWallMs) * 100) }
: {}),
}
: undefined;
const agentLoopMs = workUnits.reduce((sum, wu) => sum + (wu.agentLoopMs ?? 0), 0);
const toolMs = workUnits.reduce((sum, wu) => sum + (wu.toolMs ?? 0), 0);
const modelMs = workUnits.reduce((sum, wu) => sum + (wu.modelMs ?? 0), 0);
const workUnitAggregate =
workUnits.length > 0
? {
count: workUnits.length,
failed,
agentLoopMs,
modelMs,
toolMs,
...(agentLoopMs > 0 ? { modelPct: Math.round((modelMs / agentLoopMs) * 100) } : {}),
}
: undefined;
const parts: string[] = [];
if (dominantPhase) {
const pct = dominantPhase.pctOfWall !== undefined ? `, ${dominantPhase.pctOfWall}% of wall time` : '';
parts.push(`Slowest phase: ${dominantPhase.phase} (${formatMs(dominantPhase.totalMs)}${pct})`);
}
if (workUnitAggregate) {
const split =
workUnitAggregate.modelPct !== undefined
? `, ~${workUnitAggregate.modelPct}% model generation vs ~${100 - workUnitAggregate.modelPct}% tools`
: '';
parts.push(
`${workUnitAggregate.count} work unit${workUnitAggregate.count === 1 ? '' : 's'}${
failed > 0 ? ` (${failed} failed)` : ''
}${split}`,
);
}
const headline = parts.length > 0 ? parts.join('. ') + '.' : 'No timed phases recorded.';
return {
headline,
...(dominantPhase ? { dominantPhase } : {}),
...(workUnitAggregate ? { workUnits: workUnitAggregate } : {}),
};
}
/** Read the trace and tool transcripts for a job and aggregate them into a profile. */
export async function readIngestProfile(
jobId: string,
paths: IngestProfilePaths,
): Promise<IngestProfile> {
const traceText = await readFile(paths.tracePath, 'utf-8');
const events = parseTraceEvents(traceText);
const toolMsByUnit = await readToolMsByUnit(paths.transcriptDir);
return aggregateIngestProfile({ jobId, events, toolMsByUnit });
}
async function listTranscriptFiles(dir: string): Promise<string[]> {
// Work-unit keys can contain slashes (e.g. "cards/marketing"), so the runner
// writes nested transcript files (".../cards/marketing.jsonl"). Walk
// recursively and bucket by the `wuKey` field inside each entry rather than
// by file name.
const entries = await readdir(dir, { withFileTypes: true }).catch(() => null);
if (!entries) {
return [];
}
const files: string[] = [];
for (const entry of entries) {
const full = join(dir, entry.name);
if (entry.isDirectory()) {
files.push(...(await listTranscriptFiles(full)));
} else if (entry.isFile() && entry.name.endsWith('.jsonl')) {
files.push(full);
}
}
return files;
}
async function readToolMsByUnit(transcriptDir: string): Promise<Record<string, number>> {
const toolMs: Record<string, number> = {};
for (const file of await listTranscriptFiles(transcriptDir)) {
let text: string;
try {
text = await readFile(file, 'utf-8');
} catch {
continue;
}
for (const line of text.split('\n')) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
const entry = JSON.parse(trimmed) as { wuKey?: unknown; durationMs?: unknown };
const wuKey = asString(entry.wuKey);
const ms = asNumber(entry.durationMs);
if (wuKey && ms !== undefined) {
toolMs[wuKey] = (toolMs[wuKey] ?? 0) + ms;
}
} catch {
// skip malformed line
}
}
}
return toolMs;
}
function formatMs(ms: number | undefined): string {
if (ms === undefined) {
return '—';
}
if (ms < 1000) {
return `${Math.round(ms)}ms`;
}
const seconds = ms / 1000;
if (seconds < 60) {
return `${seconds.toFixed(1)}s`;
}
const minutes = Math.floor(seconds / 60);
const rem = Math.round(seconds - minutes * 60);
return `${minutes}m ${String(rem).padStart(2, '0')}s`;
}
function formatTokens(tokens: number | undefined): string {
if (tokens === undefined) {
return '—';
}
if (tokens < 1000) {
return String(tokens);
}
return `${(tokens / 1000).toFixed(1)}k`;
}
function pad(value: string, width: number): string {
return value.length >= width ? value : value + ' '.repeat(width - value.length);
}
function padStart(value: string, width: number): string {
return value.length >= width ? value : ' '.repeat(width - value.length) + value;
}
/** Render a human-readable profile table for stderr / the admin command. */
export function formatIngestProfile(profile: IngestProfile, options: { topWorkUnits?: number } = {}): string {
const topWorkUnits = options.topWorkUnits ?? 10;
const lines: string[] = [];
lines.push(`ktx ingest profile — job ${profile.jobId}`);
if (profile.totalWallMs !== undefined) {
lines.push(` total wall time: ${formatMs(profile.totalWallMs)}`);
}
lines.push(` ${profile.summary.headline}`);
const wall = profile.totalWallMs;
lines.push('');
lines.push(' Phase breakdown (by total duration):');
if (profile.phases.length === 0) {
lines.push(' (no timed phases recorded)');
}
for (const phase of profile.phases) {
const pct = wall && wall > 0 ? `(${((phase.totalMs / wall) * 100).toFixed(1)}%)` : '';
lines.push(
` ${pad(phase.phase, 22)}${padStart(formatMs(phase.totalMs), 9)} ${padStart(pct, 8)} ${padStart(
String(phase.count),
4,
)} event${phase.count === 1 ? '' : 's'}`,
);
}
if (profile.workUnits.length > 0) {
lines.push('');
lines.push(` Work units (top ${Math.min(topWorkUnits, profile.workUnits.length)} slowest):`);
lines.push(
` ${pad('unitKey', 30)}${padStart('total', 9)}${padStart('model', 9)}${padStart('tool', 9)}${padStart(
'steps',
8,
)}${padStart('tokens', 9)} status`,
);
for (const entry of profile.workUnits.slice(0, topWorkUnits)) {
const steps = entry.stepCount !== undefined ? String(entry.stepCount) : '—';
lines.push(
` ${pad(entry.unitKey.slice(0, 30), 30)}${padStart(formatMs(entry.totalMs), 9)}${padStart(
formatMs(entry.modelMs),
9,
)}${padStart(formatMs(entry.toolMs), 9)}${padStart(steps, 8)}${padStart(
formatTokens(entry.totalTokens),
9,
)} ${entry.status ?? '—'}`,
);
}
lines.push(
` (${profile.workUnitCount} work unit${profile.workUnitCount === 1 ? '' : 's'} total; ${
profile.failedWorkUnitCount
} failed)`,
);
}
return `${lines.join('\n')}\n`;
}
/**
* Machine-readable rendering for coding agents: the full structured profile
* (raw milliseconds and token counts, stable keys) as a single JSON object
* under a stable marker line so it is easy to locate and parse in stderr.
*/
export function formatIngestProfileJson(profile: IngestProfile): string {
return `ktx ingest profile (json)\n${JSON.stringify(profile, null, 2)}\n`;
}
export type IngestProfileMode = 'off' | 'table' | 'json';
/**
* Resolve how (and whether) to emit the ingest profile, from the
* `ingest.profile` config value and the `KTX_PROFILE_INGEST` env var. Either
* source may request `json` (raw, agent-friendly) or a human `table`; `json`
* wins if either asks for it.
*/
export function resolveIngestProfileMode(
configValue: boolean | 'json' | undefined,
env: NodeJS.ProcessEnv = process.env,
): IngestProfileMode {
const envValue = env.KTX_PROFILE_INGEST;
if (configValue === 'json' || envValue === 'json') {
return 'json';
}
const wantsTable =
configValue === true || envValue === '1' || envValue === 'true' || envValue === 'table';
return wantsTable ? 'table' : 'off';
}

View file

@ -26,16 +26,52 @@ function patchFileName(unitIndex: number, unitKey: string): string {
export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Promise<WorkUnitOutcome> {
const sessionKey = `${input.trace.context.jobId}-${input.workUnit.unitKey}`;
let cleanupOutcome: SessionOutcome = 'crash';
const createStartedAt = Date.now();
const child = await input.sessionWorktreeService.create(sessionKey, input.ingestionBaseSha);
await input.trace.event('debug', 'work_unit', 'work_unit_child_created', {
unitKey: input.workUnit.unitKey,
unitIndex: input.unitIndex,
worktreePath: child.workdir,
baseSha: input.ingestionBaseSha,
});
await input.trace.event(
'debug',
'work_unit',
'work_unit_child_created',
{
unitKey: input.workUnit.unitKey,
unitIndex: input.unitIndex,
worktreePath: child.workdir,
baseSha: input.ingestionBaseSha,
},
undefined,
Date.now() - createStartedAt,
);
try {
const runStartedAt = Date.now();
const outcome = await input.run(child);
await input.trace.event(
'debug',
'work_unit',
'work_unit_executed',
{
unitKey: input.workUnit.unitKey,
unitIndex: input.unitIndex,
status: outcome.status,
...(outcome.metrics
? {
agentLoopMs: outcome.metrics.totalMs,
stepCount: outcome.metrics.stepCount,
...(outcome.metrics.usage.inputTokens !== undefined
? { inputTokens: outcome.metrics.usage.inputTokens }
: {}),
...(outcome.metrics.usage.outputTokens !== undefined
? { outputTokens: outcome.metrics.usage.outputTokens }
: {}),
...(outcome.metrics.usage.totalTokens !== undefined
? { totalTokens: outcome.metrics.usage.totalTokens }
: {}),
}
: {}),
},
undefined,
Date.now() - runStartedAt,
);
if (outcome.status !== 'success') {
cleanupOutcome = 'success';
await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', {
@ -75,11 +111,19 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
cleanupOutcome = 'success';
throw error;
} finally {
const cleanupStartedAt = Date.now();
await input.sessionWorktreeService.cleanup(child, cleanupOutcome);
await input.trace.event('trace', 'work_unit', 'work_unit_child_cleanup', {
unitKey: input.workUnit.unitKey,
outcome: cleanupOutcome,
worktreePath: child.workdir,
});
await input.trace.event(
'trace',
'work_unit',
'work_unit_child_cleanup',
{
unitKey: input.workUnit.unitKey,
outcome: cleanupOutcome,
worktreePath: child.workdir,
},
undefined,
Date.now() - cleanupStartedAt,
);
}
}

View file

@ -716,6 +716,7 @@ export function createLocalBundleIngestRuntime(
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
profileIngest: options.project.config.ingest.profile,
ingestTraceLevel: ingestTraceLevelFromEnv(),
},
skillsRegistry: new SkillsRegistryService({ skillsDir, logger }),

View file

@ -144,6 +144,8 @@ interface IngestSettingsPort {
workUnitMaxConcurrency?: number;
workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue';
/** Print a timing breakdown to stderr at the end of each run (config-driven; see also KTX_PROFILE_INGEST). `'json'` emits the raw structured profile. */
profileIngest?: boolean | 'json';
ingestTraceLevel?: IngestTraceLevel;
}

View file

@ -1,5 +1,5 @@
import type { KtxModelRole } from '../../../llm/types.js';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js';
import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js';
import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js';
import type { WorkUnit } from '../types.js';
@ -44,6 +44,8 @@ export interface WorkUnitOutcome {
patchPath?: string;
patchTouchedPaths?: string[];
childWorktreePath?: string;
/** Timing and token metrics for the work-unit agent loop, used for ingest profiling. */
metrics?: RunLoopMetrics;
}
export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit): Promise<WorkUnitOutcome> {
@ -125,6 +127,7 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
touchedSlSources: [],
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
...(runResult.metrics ? { metrics: runResult.metrics } : {}),
};
};
@ -162,5 +165,6 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
touchedSlSources: touched,
slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason,
...(runResult.metrics ? { metrics: runResult.metrics } : {}),
};
}

View file

@ -1,4 +1,4 @@
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js';
import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { KtxModelRole } from '../../../llm/types.js';
import type { EvictionUnit } from '../types.js';
import type { StageIndex } from './stage-index.types.js';
@ -23,6 +23,7 @@ export interface ReconciliationOutcome {
skipped: boolean;
stopReason?: 'budget' | 'natural' | 'error';
error?: Error;
metrics?: RunLoopMetrics;
}
export async function runReconciliationStage4(ctx: ReconciliationContext): Promise<ReconciliationOutcome> {
@ -40,5 +41,5 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi
telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId },
onStepFinish: ctx.onStepFinish,
});
return { skipped: false, stopReason: run.stopReason, error: run.error };
return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) };
}

View file

@ -81,8 +81,13 @@ export function wrapToolsWithLogger<T extends KtxRuntimeToolSet>(
return wrapped as T;
}
// Fire-and-forget appends are intentional (the agent hot path must never block
// or fail on logging), but readers like the ingest profiler need to know when
// the writes have settled. Track in-flight appends so a consumer can flush.
const pendingWrites = new Set<Promise<void>>();
function appendEntry(path: string, entry: ToolCallLogEntry): void {
void (async () => {
const write = (async () => {
try {
await mkdir(dirname(path), { recursive: true });
await appendFile(path, `${safeStringify(entry)}\n`, 'utf-8');
@ -90,6 +95,37 @@ function appendEntry(path: string, entry: ToolCallLogEntry): void {
// best-effort
}
})();
pendingWrites.add(write);
void write.finally(() => pendingWrites.delete(write));
}
/**
* Await all in-flight tool-call log writes (best-effort, bounded by `timeoutMs`
* so it can never hang a caller). Lets readers such as the ingest profiler see
* complete transcripts despite the fire-and-forget append design.
*/
export async function flushToolCallLogs(timeoutMs = 5000): Promise<void> {
const pending = [...pendingWrites];
if (pending.length === 0) {
return;
}
const settled = Promise.allSettled(pending).then(() => undefined);
if (timeoutMs <= 0) {
await settled;
return;
}
let timer: ReturnType<typeof setTimeout> | undefined;
const timeout = new Promise<void>((resolve) => {
timer = setTimeout(resolve, timeoutMs);
timer.unref?.();
});
try {
await Promise.race([settled, timeout]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
}
function safeStringify(v: unknown): string {

View file

@ -9,6 +9,7 @@ import type {
KtxGenerateObjectInput,
KtxGenerateTextInput,
KtxLlmRuntimePort,
LlmTokenUsage,
RunLoopParams,
RunLoopResult,
} from './runtime-port.js';
@ -17,6 +18,23 @@ interface AgentTelemetryPort {
createTelemetry(tags: Record<string, string>): TelemetrySettings;
}
interface MaybeUsage {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
}
function toLlmTokenUsage(usage: MaybeUsage | undefined): LlmTokenUsage {
if (!usage) {
return {};
}
return {
...(usage.inputTokens !== undefined ? { inputTokens: usage.inputTokens } : {}),
...(usage.outputTokens !== undefined ? { outputTokens: usage.outputTokens } : {}),
...(usage.totalTokens !== undefined ? { totalTokens: usage.totalTokens } : {}),
};
}
export interface AiSdkKtxLlmRuntimeDeps {
llmProvider: KtxLlmProvider;
telemetry?: AgentTelemetryPort;
@ -48,6 +66,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
model,
});
const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({
model,
temperature: input.temperature ?? 0,
@ -62,6 +81,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}
: {}),
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (typeof result.text !== 'string') {
throw new Error('KTX LLM text generation returned no text');
}
@ -80,6 +100,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
model,
});
const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({
model,
temperature: input.temperature ?? 0,
@ -95,6 +116,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
: {}),
output: Output.object({ schema: input.schema as unknown as FlexibleSchema<TOutput> }),
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (result.output == null) {
throw new Error('KTX LLM object generation returned no output');
}
@ -103,6 +125,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
let stepIndex = 0;
const startedAt = Date.now();
const stepBoundariesMs: number[] = [];
try {
const model = this.deps.llmProvider.getModel(params.modelRole);
const tools = createAiSdkToolSet(params.toolSet);
@ -128,7 +152,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}),
);
await generateText({
const result = await generateText({
model,
temperature: 0,
stopWhen: stepCountIs(params.stepBudget),
@ -141,6 +165,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
tools: built.tools as ToolSet,
onStepFinish: async () => {
stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) {
return;
}
@ -155,11 +180,23 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}
},
});
return { stopReason: 'natural' };
return {
stopReason: 'natural',
metrics: {
totalMs: Date.now() - startedAt,
stepCount: stepIndex,
stepBoundariesMs,
usage: toLlmTokenUsage(result.totalUsage ?? result.usage),
},
};
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
this.logger.warn(`[agent-runner] loop failed: ${err.message}`);
return { stopReason: 'error', error: err };
return {
stopReason: 'error',
error: err,
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} },
};
}
}
}

View file

@ -15,6 +15,7 @@ import type {
KtxGenerateTextInput,
KtxLlmRuntimePort,
KtxRuntimeToolSet,
LlmTokenUsage,
RunLoopParams,
RunLoopResult,
RunLoopStopReason,
@ -22,6 +23,20 @@ import type {
type QueryFn = (params: Parameters<typeof defaultQuery>[0]) => AsyncIterable<SDKMessage>;
function claudeTokenUsage(result: SDKResultMessage): LlmTokenUsage {
const usage = (result as { usage?: { input_tokens?: number; output_tokens?: number } }).usage;
if (!usage) {
return {};
}
const { input_tokens: inputTokens, output_tokens: outputTokens } = usage;
const totalTokens = inputTokens !== undefined && outputTokens !== undefined ? inputTokens + outputTokens : undefined;
return {
...(inputTokens !== undefined ? { inputTokens } : {}),
...(outputTokens !== undefined ? { outputTokens } : {}),
...(totalTokens !== undefined ? { totalTokens } : {}),
};
}
export interface ClaudeCodeKtxLlmRuntimeDeps {
projectDir: string;
modelSlots: { default: string } & Partial<Record<string, string>>;
@ -236,6 +251,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
maxTurns: 1,
tools: input.tools,
});
const startedAt = Date.now();
const result = await collectResult({
query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
@ -243,6 +259,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
allowedToolIds: new Set(mcpToolIds(input.tools ?? {})),
expectedMcpServerNames: expectedMcpServerNames(input.tools),
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result);
if (error) {
throw error;
@ -271,6 +288,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
}),
outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) },
};
const startedAt = Date.now();
const result = await collectResult({
query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
@ -278,6 +296,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]),
expectedMcpServerNames: expectedMcpServerNames(input.tools),
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result);
if (error) {
throw error;
@ -290,6 +309,8 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
let stepIndex = 0;
const startedAt = Date.now();
const stepBoundariesMs: number[] = [];
try {
const options = baseOptions({
projectDir: this.deps.projectDir,
@ -306,6 +327,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
expectedMcpServerNames: expectedMcpServerNames(params.toolSet),
onAssistantTurn: async () => {
stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) {
return;
}
@ -322,10 +344,23 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
});
const stopReason = mapClaudeCodeStopReason(result);
const error = resultError(result);
return { stopReason, ...(stopReason === 'error' && error ? { error } : {}) };
return {
stopReason,
...(stopReason === 'error' && error ? { error } : {}),
metrics: {
totalMs: Date.now() - startedAt,
stepCount: stepIndex,
stepBoundariesMs,
usage: claudeTokenUsage(result),
},
};
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
return { stopReason: 'error', error: err };
return {
stopReason: 'error',
error: err,
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} },
};
}
}
}

View file

@ -23,6 +23,24 @@ export interface RunLoopStepInfo {
stepBudget: number;
}
export interface LlmTokenUsage {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
}
/** Timing and token metrics for a multi-step agent loop, used for ingest profiling. */
export interface RunLoopMetrics {
/** Wall-clock time around the whole `generateText` call, in milliseconds. */
totalMs: number;
/** Aggregate token usage across all steps. */
usage: LlmTokenUsage;
/** Number of agent steps (model round-trips) that actually ran. */
stepCount: number;
/** Wall-clock offset (ms from loop start) at which each step finished. */
stepBoundariesMs: number[];
}
export interface RunLoopParams {
modelRole: KtxModelRole;
systemPrompt: string;
@ -36,6 +54,7 @@ export interface RunLoopParams {
export interface RunLoopResult {
stopReason: RunLoopStopReason;
error?: Error;
metrics?: RunLoopMetrics;
}
export interface KtxGenerateTextInput {
@ -44,6 +63,7 @@ export interface KtxGenerateTextInput {
system?: string;
tools?: KtxRuntimeToolSet;
temperature?: number;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
}
export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutput>> {
@ -53,6 +73,7 @@ export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutp
tools?: KtxRuntimeToolSet;
temperature?: number;
schema: TSchema;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
}
export interface KtxLlmRuntimePort {

View file

@ -110,6 +110,12 @@ const ingestSchema = z
.prefault({ backend: 'none' })
.describe('Embedding configuration used when ingest adapters need to embed documents.'),
workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'),
profile: z
.union([z.boolean(), z.literal('json')])
.default(false)
.describe(
'Print a timing breakdown to stderr at the end of each ingest run. `true` prints a human table; `"json"` prints the raw structured profile for coding agents; `false` disables it. Equivalent to the KTX_PROFILE_INGEST environment variable (`1`/`true`/`json`).',
),
})
.describe('Ingest pipeline configuration: adapters, embeddings, and work-unit policy.');