feat(cli): profile ingest runs and split model vs tool time (#249)

* feat(cli): profile ingest runs to find where wall-clock time goes

Add opt-in profiling for `ktx ingest`. Each timed phase, work unit, and
agent loop now records durationMs / step count / token usage in the
trace, and a post-run aggregator rolls them up into a "where did the
time go" report printed to stderr.

Enable per run with KTX_PROFILE_INGEST (1/true -> human table, json ->
raw structured profile) or persistently via `ingest.profile` in
ktx.yaml. The json form emits raw milliseconds, token counts, and a
summary.headline one-line diagnosis so coding agents can parse it
directly; json wins when both env and config request profiling.

- runtime-port: RunLoopMetrics (totalMs, usage, stepCount,
  stepBoundariesMs) plus onMetrics callbacks on text/object generation
- ai-sdk + claude-code runtimes: capture per-loop timing and token usage
- work-unit-executor and stages 3/4: thread metrics into trace events
- ingest-bundle.runner: time worktree / triage / clustering / index /
  reconcile / squash phases and emit the profile in a finally block
  (best-effort; never affects the run outcome)
- ingest-profile: new trace+transcript aggregator with table/json formatters
- config: ingest.profile flag; docs: profiling section in ktx-ingest.mdx

* fix(cli): flush tool-call logs before reading ingest profile

Tool transcripts are appended fire-and-forget so the agent hot path never
blocks on logging. The ingest profiler read them before the writes settled,
so per-work-unit toolMs (and the model-vs-tool split derived from it) could
be incomplete. Track in-flight appends and expose flushToolCallLogs() —
bounded by a timeout so it can never hang — and flush before the profiler
reads the transcript.
This commit is contained in:
Andrey Avtomonov 2026-06-01 15:49:17 +02:00 committed by GitHub
parent 22ddf5524c
commit 21744fc520
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1243 additions and 56 deletions

View file

@ -143,6 +143,42 @@ verbosity:
KTX_INGEST_TRACE_LEVEL=trace ktx ingest metabase KTX_INGEST_TRACE_LEVEL=trace ktx ingest metabase
``` ```
### Profiling a slow ingest
Each timed phase and work unit records a `durationMs` in the trace, and each
agent loop records its step count and token usage. To see where wall-clock time
went, enable profiling and **ktx** prints a rolled-up breakdown to stderr at the
end of the run. There are two ways to turn it on, and two output formats.
Turn it on per run with the `KTX_PROFILE_INGEST` environment variable, or
persistently with `ingest.profile` in `ktx.yaml` (useful for CI or while
iterating on a slow source):
```bash
KTX_PROFILE_INGEST=1 ktx ingest metabase # human-readable table
KTX_PROFILE_INGEST=json ktx ingest metabase # raw JSON for coding agents
```
```yaml
ingest:
profile: true # human table; use "json" for the machine-readable form
```
Both formats report total wall time, time per phase, and the slowest work units,
splitting each work unit's agent-loop time into model time versus tool-execution
time. The `json` form emits the full structured profile (raw milliseconds and
token counts, stable keys) plus a `summary.headline` one-line diagnosis, so a
coding agent can parse it directly instead of scraping the table. If both the env
var and the config request profiling, `json` wins. Example headline:
```text
Slowest phase: reconciliation (2m 05s, 48% of wall time). 2 work units (1 failed), ~88% model generation vs ~12% tools.
```
Work units run serially by default (`ingest.workUnits.maxConcurrency` is `1`);
raise it in `ktx.yaml` if the profile shows the run is bound by serialized
work-unit agent loops.
## Common errors ## Common errors
| Error | Cause | Recovery | | Error | Cause | Recovery |

View file

@ -25,6 +25,7 @@ import {
deriveFinalizationWikiPageKeys, deriveFinalizationWikiPageKeys,
} from './finalization-scope.js'; } from './finalization-scope.js';
import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js'; import { FileIngestTraceWriter, ingestTracePathForJob, type IngestTraceWriter, traceTimed } from './ingest-trace.js';
import { formatIngestProfile, formatIngestProfileJson, readIngestProfile, resolveIngestProfileMode } from './ingest-profile.js';
import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js'; import { integrateWorkUnitPatch } from './isolated-diff/patch-integrator.js';
import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js'; import { resolveTextualConflict } from './isolated-diff/textual-conflict-resolver.js';
import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js'; import { runIsolatedWorkUnit } from './isolated-diff/work-unit-executor.js';
@ -69,7 +70,7 @@ import { createEvictionListTool } from './tools/eviction-list.tool.js';
import { createReadRawSpanTool } from './tools/read-raw-span.tool.js'; import { createReadRawSpanTool } from './tools/read-raw-span.tool.js';
import { createStageDiffTool } from './tools/stage-diff.tool.js'; import { createStageDiffTool } from './tools/stage-diff.tool.js';
import { createStageListTool } from './tools/stage-list.tool.js'; import { createStageListTool } from './tools/stage-list.tool.js';
import { type ToolCallLogEntry, wrapToolsWithLogger } from './tools/tool-call-logger.js'; import { flushToolCallLogs, type ToolCallLogEntry, wrapToolsWithLogger } from './tools/tool-call-logger.js';
import { import {
createMutableToolTranscriptSummary, createMutableToolTranscriptSummary,
recordToolTranscriptEntry, recordToolTranscriptEntry,
@ -239,6 +240,41 @@ export class IngestBundleRunner {
} catch (error) { } catch (error) {
ctx?.memoryFlow?.finish('error', [sanitizeMemoryFlowError(error)]); ctx?.memoryFlow?.finish('error', [sanitizeMemoryFlowError(error)]);
throw error; throw error;
} finally {
await this.maybeEmitIngestProfile(job.jobId);
}
}
/**
* When profiling is enabled via the `KTX_PROFILE_INGEST` env var or the
* `ingest.profile` config setting read the job's trace + tool transcripts
* and print a rolled-up timing breakdown to stderr. `json` emits the raw
* structured profile for coding agents; `table` emits a human summary.
* Best-effort: profiling never affects the run outcome.
*/
private async maybeEmitIngestProfile(jobId: string): Promise<void> {
const mode = resolveIngestProfileMode(this.deps.settings.profileIngest);
if (mode === 'off') {
return;
}
try {
// Tool transcripts are appended fire-and-forget; flush them so per-work-unit
// toolMs (and the derived model-vs-tool split) is complete before we read.
await flushToolCallLogs();
const storage = this.deps.storage as typeof this.deps.storage & {
resolveTracePath?: (jobId: string) => string;
};
const profile = await readIngestProfile(jobId, {
tracePath: storage.resolveTracePath?.(jobId) ?? ingestTracePathForJob(this.deps.storage.homeDir, jobId),
transcriptDir: this.deps.storage.resolveTranscriptDir(jobId),
});
process.stderr.write(`\n${mode === 'json' ? formatIngestProfileJson(profile) : formatIngestProfile(profile)}`);
} catch (error) {
this.logger.warn(
`[ingest-bundle] ingest profile unavailable for job=${jobId}: ${
error instanceof Error ? error.message : String(error)
}`,
);
} }
} }
@ -1100,8 +1136,15 @@ export class IngestBundleRunner {
const scopeDescriptor = adapter.describeScope ? await adapter.describeScope(stagedDir) : null; const scopeDescriptor = adapter.describeScope ? await adapter.describeScope(stagedDir) : null;
const sessionWorktree = await this.deps.lockingService.withLock('config:repo', () => const sessionWorktree = await traceTimed(
this.deps.sessionWorktreeService.create(job.jobId, baseSha), trace,
'worktree',
'session_worktree_created',
{ jobId: job.jobId },
() =>
this.deps.lockingService.withLock('config:repo', () =>
this.deps.sessionWorktreeService.create(job.jobId, baseSha),
),
); );
let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash'; let cleanupOutcome: 'success' | 'crash' | 'conflict' = 'crash';
@ -1272,26 +1315,34 @@ export class IngestBundleRunner {
sourceContextReport = chunk.contextReport; sourceContextReport = chunk.contextReport;
parseArtifacts = chunk.parseArtifacts; parseArtifacts = chunk.parseArtifacts;
reconcileNotes = chunk.reconcileNotes ?? []; reconcileNotes = chunk.reconcileNotes ?? [];
const pageTriage = this.deps.pageTriage;
const triageRunId = runRow.id;
triageResult = triageResult =
contextReport && adapter.triageSupported && this.deps.pageTriage contextReport && adapter.triageSupported && pageTriage
? await this.deps.pageTriage.triageRun({ ? await traceTimed(runTrace, 'triage', 'page_triage', { sourceKey: job.sourceKey }, () =>
stagedDir, pageTriage.triageRun({
runId: runRow.id, stagedDir,
connectionId: job.connectionId, runId: triageRunId,
sourceKey: job.sourceKey, connectionId: job.connectionId,
syncId, sourceKey: job.sourceKey,
jobId: job.jobId, syncId,
diffSet, jobId: job.jobId,
adapter, diffSet,
}) adapter,
}),
)
: null; : null;
workUnits = this.filterWorkUnitsForTriage(workUnits, triageResult); workUnits = this.filterWorkUnitsForTriage(workUnits, triageResult);
if (adapter.clusterWorkUnits && workUnits.length > 0) { const clusterWorkUnits = adapter.clusterWorkUnits;
workUnits = await adapter.clusterWorkUnits({ if (clusterWorkUnits && workUnits.length > 0) {
workUnits, const preClusterCount = workUnits.length;
stagedDir, workUnits = await traceTimed(
embedding: this.deps.embedding, runTrace,
}); 'clustering',
'cluster_work_units',
{ workUnitCount: preClusterCount },
() => clusterWorkUnits({ workUnits, stagedDir, embedding: this.deps.embedding }),
);
} }
await stage2?.updateProgress(1.0, `Planned ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`); await stage2?.updateProgress(1.0, `Planned ${workUnits.length} update${workUnits.length === 1 ? '' : 's'}`);
} }
@ -1326,7 +1377,13 @@ export class IngestBundleRunner {
}); });
// Build shared per-job context. // Build shared per-job context.
const [wikiIndex, slIndex] = await Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]); const [wikiIndex, slIndex] = await traceTimed(
runTrace,
'index_build',
'build_indexes',
{ connectionCount: slConnectionIds.length },
() => Promise.all([this.buildWikiIndex(), this.buildSlIndex(slConnectionIds)]),
);
const baseFraming = await this.deps.promptService.loadPrompt('memory_agent_bundle_ingest_work_unit'); const baseFraming = await this.deps.promptService.loadPrompt('memory_agent_bundle_ingest_work_unit');
const wuSkillNames = Array.from( const wuSkillNames = Array.from(
@ -1881,6 +1938,8 @@ export class IngestBundleRunner {
let curatorWarnings: string[] = []; let curatorWarnings: string[] = [];
let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>; let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>;
const reconcileStartedAt = Date.now();
const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single';
if (contextReport && this.deps.curatorPagination) { if (contextReport && this.deps.curatorPagination) {
const curatorOutcome = await this.deps.curatorPagination.reconcile({ const curatorOutcome = await this.deps.curatorPagination.reconcile({
runId: runRow.id, runId: runRow.id,
@ -1989,6 +2048,33 @@ export class IngestBundleRunner {
: undefined, : undefined,
}); });
} }
await runTrace.event(
'debug',
'reconciliation',
'reconciliation_executed',
{
mode: reconcileMode,
skipped: reconcileOutcome.skipped,
...(reconcileOutcome.stopReason ? { stopReason: reconcileOutcome.stopReason } : {}),
...(reconcileOutcome.metrics
? {
agentLoopMs: reconcileOutcome.metrics.totalMs,
stepCount: reconcileOutcome.metrics.stepCount,
...(reconcileOutcome.metrics.usage.inputTokens !== undefined
? { inputTokens: reconcileOutcome.metrics.usage.inputTokens }
: {}),
...(reconcileOutcome.metrics.usage.outputTokens !== undefined
? { outputTokens: reconcileOutcome.metrics.usage.outputTokens }
: {}),
...(reconcileOutcome.metrics.usage.totalTokens !== undefined
? { totalTokens: reconcileOutcome.metrics.usage.totalTokens }
: {}),
}
: {}),
},
undefined,
Date.now() - reconcileStartedAt,
);
latestReconciliationSkipped = reconcileOutcome.skipped; latestReconciliationSkipped = reconcileOutcome.skipped;
const danglingReconcileWikiRefs = await findDanglingWikiRefsForActions({ const danglingReconcileWikiRefs = await findDanglingWikiRefsForActions({
@ -2036,6 +2122,7 @@ export class IngestBundleRunner {
activePhase = 'finalization'; activePhase = 'finalization';
if (adapter.finalize) { if (adapter.finalize) {
const stageFinalization = ctx?.startPhase(0.04); const stageFinalization = ctx?.startPhase(0.04);
const finalizationStartedAt = Date.now();
emitStageProgress('finalization', 87, 'Running deterministic finalization'); emitStageProgress('finalization', 87, 'Running deterministic finalization');
await stageFinalization?.updateProgress(0.0, 'Running deterministic finalization'); await stageFinalization?.updateProgress(0.0, 'Running deterministic finalization');
await runTrace.event('debug', 'finalization', 'finalization_started', { sourceKey: job.sourceKey }); await runTrace.event('debug', 'finalization', 'finalization_started', { sourceKey: job.sourceKey });
@ -2215,14 +2302,21 @@ export class IngestBundleRunner {
latestFinalizationOutcome = finalizationOutcome; latestFinalizationOutcome = finalizationOutcome;
emitStageProgress('finalization', 88, 'Deterministic finalization complete'); emitStageProgress('finalization', 88, 'Deterministic finalization complete');
await stageFinalization?.updateProgress(1.0, 'Deterministic finalization complete'); await stageFinalization?.updateProgress(1.0, 'Deterministic finalization complete');
await runTrace.event('debug', 'finalization', 'finalization_committed', { await runTrace.event(
sourceKey: job.sourceKey, 'debug',
commitSha: finalizationSha, 'finalization',
touchedPaths: finalizationTouchedPaths, 'finalization_committed',
touchedSources: finalizationTouchedSources, {
changedWikiPageKeys: finalizationChangedWikiPageKeys, sourceKey: job.sourceKey,
warnings: result.warnings, commitSha: finalizationSha,
}); touchedPaths: finalizationTouchedPaths,
touchedSources: finalizationTouchedSources,
changedWikiPageKeys: finalizationChangedWikiPageKeys,
warnings: result.warnings,
},
undefined,
Date.now() - finalizationStartedAt,
);
} else { } else {
await runTrace.event('debug', 'finalization', 'finalization_skipped', { sourceKey: job.sourceKey }); await runTrace.event('debug', 'finalization', 'finalization_skipped', { sourceKey: job.sourceKey });
} }
@ -2504,6 +2598,7 @@ export class IngestBundleRunner {
const stage6 = ctx?.startPhase(0.04); const stage6 = ctx?.startPhase(0.04);
emitStageProgress('save', 91, 'Saving changes'); emitStageProgress('save', 91, 'Saving changes');
await stage6?.updateProgress(0.0, 'Saving changes'); await stage6?.updateProgress(0.0, 'Saving changes');
const squashStartedAt = Date.now();
try { try {
await sessionWorktree.git.assertWorktreeClean(); await sessionWorktree.git.assertWorktreeClean();
} catch (error) { } catch (error) {
@ -2527,10 +2622,17 @@ export class IngestBundleRunner {
throw new Error(`squash merge conflict: ${mergeResult.conflictPaths.join(', ')}`); throw new Error(`squash merge conflict: ${mergeResult.conflictPaths.join(', ')}`);
} }
const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha; const commitSha = mergeResult.touchedPaths.length === 0 ? null : mergeResult.squashSha;
await runTrace.event('debug', 'squash', 'squash_finished', { await runTrace.event(
commitSha, 'debug',
touchedPaths: mergeResult.touchedPaths, 'squash',
}); 'squash_finished',
{
commitSha,
touchedPaths: mergeResult.touchedPaths,
},
undefined,
Date.now() - squashStartedAt,
);
const memoryFlowSavedActions = stageIndex.workUnits const memoryFlowSavedActions = stageIndex.workUnits
.flatMap((wu) => wu.actions) .flatMap((wu) => wu.actions)
.concat(reconcileActions) .concat(reconcileActions)
@ -2547,6 +2649,7 @@ export class IngestBundleRunner {
// transaction. If this throws, the run fails and no partial index state // transaction. If this throws, the run fails and no partial index state
// survives (thanks to the transactional upsert in applyDiffTransactional). // survives (thanks to the transactional upsert in applyDiffTransactional).
if (commitSha) { if (commitSha) {
const indexSyncStartedAt = Date.now();
// Multi-file squash → omit path so the handler diffs the whole commit // Multi-file squash → omit path so the handler diffs the whole commit
// (a comma-joined pathspec would match nothing and the job would no-op). // (a comma-joined pathspec would match nothing and the job would no-op).
const pathFilter = mergeResult.touchedPaths.length === 1 ? mergeResult.touchedPaths[0] : ''; const pathFilter = mergeResult.touchedPaths.length === 1 ? mergeResult.touchedPaths[0] : '';
@ -2571,6 +2674,14 @@ export class IngestBundleRunner {
); );
} }
} }
await runTrace.event(
'debug',
'index_sync',
'post_squash_index_sync_finished',
{ connectionCount: touchedConnections.length },
undefined,
Date.now() - indexSyncStartedAt,
);
} }
const stage5 = ctx?.startPhase(0.04); const stage5 = ctx?.startPhase(0.04);

View file

@ -0,0 +1,437 @@
import { readdir, readFile } from 'node:fs/promises';
import { join } from 'node:path';
import { z } from 'zod';
export interface IngestProfilePaths {
tracePath: string;
transcriptDir: string;
}
/**
* Post-processor over the ingest trace (`<home>/ingest-traces/<jobId>/trace.jsonl`)
* and per-work-unit tool transcripts. Turns the durations recorded during a run
* into a rolled-up "where did the time go" view. Gated for display by
* `KTX_PROFILE_INGEST`; the durations themselves are always written to the trace.
*/
const traceEventSchema = z
.object({
at: z.string().optional(),
phase: z.string(),
event: z.string(),
durationMs: z.number().optional(),
data: z.record(z.string(), z.unknown()).optional(),
})
.loose();
/** @internal */
export type ProfiledTraceEvent = z.infer<typeof traceEventSchema>;
export interface IngestProfile {
jobId: string;
totalWallMs?: number;
phases: Array<{
phase: string;
totalMs: number;
/** Number of timed (durationMs-bearing) events that contributed to this phase. */
count: number;
}>;
workUnits: Array<{
unitKey: string;
status?: string;
/** Wall-clock for the whole work-unit run (agent loop + validation + git). */
totalMs?: number;
/** Pure `generateText` agent-loop time reported by the runtime. */
agentLoopMs?: number;
/** Summed tool-execution time from the work-unit transcript. */
toolMs?: number;
/** Derived model "thinking" time = agentLoopMs - toolMs (clamped at 0). */
modelMs?: number;
/** Worktree create time. */
createMs?: number;
/** Worktree teardown time. */
cleanupMs?: number;
stepCount?: number;
totalTokens?: number;
}>;
workUnitCount: number;
failedWorkUnitCount: number;
/**
* Plain-language diagnosis plus the raw numbers behind it, so a reader (human
* or coding agent) gets the conclusion without re-deriving it from the tables.
*/
summary: {
/** One-sentence conclusion, e.g. which phase dominated and whether work was model- or tool-bound. */
headline: string;
dominantPhase?: { phase: string; totalMs: number; pctOfWall?: number };
/** Aggregate across all work units, in milliseconds. */
workUnits?: {
count: number;
failed: number;
agentLoopMs: number;
modelMs: number;
toolMs: number;
/** Percent of agent-loop time spent in model generation vs tool execution. */
modelPct?: number;
};
};
}
type IngestWorkUnitTiming = IngestProfile['workUnits'][number];
function asNumber(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function asString(value: unknown): string | undefined {
return typeof value === 'string' ? value : undefined;
}
/** @internal */
export function parseTraceEvents(traceText: string): ProfiledTraceEvent[] {
const events: ProfiledTraceEvent[] = [];
for (const line of traceText.split('\n')) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
let json: unknown;
try {
json = JSON.parse(trimmed);
} catch {
continue;
}
const parsed = traceEventSchema.safeParse(json);
if (parsed.success) {
events.push(parsed.data);
}
}
return events;
}
/** @internal */
export function aggregateIngestProfile(input: {
jobId: string;
events: ProfiledTraceEvent[];
toolMsByUnit: Record<string, number>;
}): IngestProfile {
const { jobId, events, toolMsByUnit } = input;
const phaseTotals = new Map<string, { totalMs: number; count: number }>();
const workUnits = new Map<string, IngestWorkUnitTiming>();
const wu = (unitKey: string): IngestWorkUnitTiming => {
let existing = workUnits.get(unitKey);
if (!existing) {
existing = { unitKey };
workUnits.set(unitKey, existing);
}
return existing;
};
let minAt = Number.POSITIVE_INFINITY;
let maxAt = Number.NEGATIVE_INFINITY;
for (const event of events) {
const at = event.at ? Date.parse(event.at) : Number.NaN;
if (!Number.isNaN(at)) {
minAt = Math.min(minAt, at);
maxAt = Math.max(maxAt, at);
}
if (event.durationMs !== undefined) {
const bucket = phaseTotals.get(event.phase) ?? { totalMs: 0, count: 0 };
bucket.totalMs += event.durationMs;
bucket.count += 1;
phaseTotals.set(event.phase, bucket);
}
const data = event.data ?? {};
const unitKey = asString(data.unitKey);
if (unitKey) {
const entry = wu(unitKey);
if (event.event === 'work_unit_executed') {
entry.totalMs = event.durationMs;
entry.agentLoopMs = asNumber(data.agentLoopMs);
entry.stepCount = asNumber(data.stepCount);
entry.totalTokens = asNumber(data.totalTokens);
entry.status = asString(data.status) ?? entry.status;
} else if (event.event === 'work_unit_child_created') {
entry.createMs = event.durationMs;
} else if (event.event === 'work_unit_child_cleanup') {
entry.cleanupMs = event.durationMs;
} else if (event.event === 'work_unit_failed_before_patch') {
entry.status = entry.status ?? 'failed';
}
}
}
for (const [unitKey, entry] of workUnits) {
const toolMs = toolMsByUnit[unitKey];
if (toolMs !== undefined) {
entry.toolMs = toolMs;
if (entry.agentLoopMs !== undefined) {
entry.modelMs = Math.max(0, entry.agentLoopMs - toolMs);
}
} else if (entry.agentLoopMs !== undefined) {
entry.modelMs = entry.agentLoopMs;
}
}
const phases = [...phaseTotals.entries()]
.map(([phase, { totalMs, count }]) => ({ phase, totalMs, count }))
.sort((a, b) => b.totalMs - a.totalMs);
const workUnitList = [...workUnits.values()].sort((a, b) => (b.totalMs ?? 0) - (a.totalMs ?? 0));
const totalWallMs = Number.isFinite(minAt) && Number.isFinite(maxAt) && maxAt >= minAt ? maxAt - minAt : undefined;
const failedWorkUnitCount = workUnitList.filter((entry) => entry.status === 'failed').length;
return {
jobId,
...(totalWallMs !== undefined ? { totalWallMs } : {}),
phases,
workUnits: workUnitList,
workUnitCount: workUnitList.length,
failedWorkUnitCount,
summary: buildSummary(phases, workUnitList, failedWorkUnitCount, totalWallMs),
};
}
function buildSummary(
phases: IngestProfile['phases'],
workUnits: IngestWorkUnitTiming[],
failed: number,
totalWallMs: number | undefined,
): IngestProfile['summary'] {
const dominant = phases[0];
const dominantPhase = dominant
? {
phase: dominant.phase,
totalMs: dominant.totalMs,
...(totalWallMs && totalWallMs > 0
? { pctOfWall: Math.round((dominant.totalMs / totalWallMs) * 100) }
: {}),
}
: undefined;
const agentLoopMs = workUnits.reduce((sum, wu) => sum + (wu.agentLoopMs ?? 0), 0);
const toolMs = workUnits.reduce((sum, wu) => sum + (wu.toolMs ?? 0), 0);
const modelMs = workUnits.reduce((sum, wu) => sum + (wu.modelMs ?? 0), 0);
const workUnitAggregate =
workUnits.length > 0
? {
count: workUnits.length,
failed,
agentLoopMs,
modelMs,
toolMs,
...(agentLoopMs > 0 ? { modelPct: Math.round((modelMs / agentLoopMs) * 100) } : {}),
}
: undefined;
const parts: string[] = [];
if (dominantPhase) {
const pct = dominantPhase.pctOfWall !== undefined ? `, ${dominantPhase.pctOfWall}% of wall time` : '';
parts.push(`Slowest phase: ${dominantPhase.phase} (${formatMs(dominantPhase.totalMs)}${pct})`);
}
if (workUnitAggregate) {
const split =
workUnitAggregate.modelPct !== undefined
? `, ~${workUnitAggregate.modelPct}% model generation vs ~${100 - workUnitAggregate.modelPct}% tools`
: '';
parts.push(
`${workUnitAggregate.count} work unit${workUnitAggregate.count === 1 ? '' : 's'}${
failed > 0 ? ` (${failed} failed)` : ''
}${split}`,
);
}
const headline = parts.length > 0 ? parts.join('. ') + '.' : 'No timed phases recorded.';
return {
headline,
...(dominantPhase ? { dominantPhase } : {}),
...(workUnitAggregate ? { workUnits: workUnitAggregate } : {}),
};
}
/** Read the trace and tool transcripts for a job and aggregate them into a profile. */
export async function readIngestProfile(
jobId: string,
paths: IngestProfilePaths,
): Promise<IngestProfile> {
const traceText = await readFile(paths.tracePath, 'utf-8');
const events = parseTraceEvents(traceText);
const toolMsByUnit = await readToolMsByUnit(paths.transcriptDir);
return aggregateIngestProfile({ jobId, events, toolMsByUnit });
}
async function listTranscriptFiles(dir: string): Promise<string[]> {
// Work-unit keys can contain slashes (e.g. "cards/marketing"), so the runner
// writes nested transcript files (".../cards/marketing.jsonl"). Walk
// recursively and bucket by the `wuKey` field inside each entry rather than
// by file name.
const entries = await readdir(dir, { withFileTypes: true }).catch(() => null);
if (!entries) {
return [];
}
const files: string[] = [];
for (const entry of entries) {
const full = join(dir, entry.name);
if (entry.isDirectory()) {
files.push(...(await listTranscriptFiles(full)));
} else if (entry.isFile() && entry.name.endsWith('.jsonl')) {
files.push(full);
}
}
return files;
}
async function readToolMsByUnit(transcriptDir: string): Promise<Record<string, number>> {
const toolMs: Record<string, number> = {};
for (const file of await listTranscriptFiles(transcriptDir)) {
let text: string;
try {
text = await readFile(file, 'utf-8');
} catch {
continue;
}
for (const line of text.split('\n')) {
const trimmed = line.trim();
if (!trimmed) {
continue;
}
try {
const entry = JSON.parse(trimmed) as { wuKey?: unknown; durationMs?: unknown };
const wuKey = asString(entry.wuKey);
const ms = asNumber(entry.durationMs);
if (wuKey && ms !== undefined) {
toolMs[wuKey] = (toolMs[wuKey] ?? 0) + ms;
}
} catch {
// skip malformed line
}
}
}
return toolMs;
}
function formatMs(ms: number | undefined): string {
if (ms === undefined) {
return '—';
}
if (ms < 1000) {
return `${Math.round(ms)}ms`;
}
const seconds = ms / 1000;
if (seconds < 60) {
return `${seconds.toFixed(1)}s`;
}
const minutes = Math.floor(seconds / 60);
const rem = Math.round(seconds - minutes * 60);
return `${minutes}m ${String(rem).padStart(2, '0')}s`;
}
function formatTokens(tokens: number | undefined): string {
if (tokens === undefined) {
return '—';
}
if (tokens < 1000) {
return String(tokens);
}
return `${(tokens / 1000).toFixed(1)}k`;
}
function pad(value: string, width: number): string {
return value.length >= width ? value : value + ' '.repeat(width - value.length);
}
function padStart(value: string, width: number): string {
return value.length >= width ? value : ' '.repeat(width - value.length) + value;
}
/** Render a human-readable profile table for stderr / the admin command. */
export function formatIngestProfile(profile: IngestProfile, options: { topWorkUnits?: number } = {}): string {
const topWorkUnits = options.topWorkUnits ?? 10;
const lines: string[] = [];
lines.push(`ktx ingest profile — job ${profile.jobId}`);
if (profile.totalWallMs !== undefined) {
lines.push(` total wall time: ${formatMs(profile.totalWallMs)}`);
}
lines.push(` ${profile.summary.headline}`);
const wall = profile.totalWallMs;
lines.push('');
lines.push(' Phase breakdown (by total duration):');
if (profile.phases.length === 0) {
lines.push(' (no timed phases recorded)');
}
for (const phase of profile.phases) {
const pct = wall && wall > 0 ? `(${((phase.totalMs / wall) * 100).toFixed(1)}%)` : '';
lines.push(
` ${pad(phase.phase, 22)}${padStart(formatMs(phase.totalMs), 9)} ${padStart(pct, 8)} ${padStart(
String(phase.count),
4,
)} event${phase.count === 1 ? '' : 's'}`,
);
}
if (profile.workUnits.length > 0) {
lines.push('');
lines.push(` Work units (top ${Math.min(topWorkUnits, profile.workUnits.length)} slowest):`);
lines.push(
` ${pad('unitKey', 30)}${padStart('total', 9)}${padStart('model', 9)}${padStart('tool', 9)}${padStart(
'steps',
8,
)}${padStart('tokens', 9)} status`,
);
for (const entry of profile.workUnits.slice(0, topWorkUnits)) {
const steps = entry.stepCount !== undefined ? String(entry.stepCount) : '—';
lines.push(
` ${pad(entry.unitKey.slice(0, 30), 30)}${padStart(formatMs(entry.totalMs), 9)}${padStart(
formatMs(entry.modelMs),
9,
)}${padStart(formatMs(entry.toolMs), 9)}${padStart(steps, 8)}${padStart(
formatTokens(entry.totalTokens),
9,
)} ${entry.status ?? '—'}`,
);
}
lines.push(
` (${profile.workUnitCount} work unit${profile.workUnitCount === 1 ? '' : 's'} total; ${
profile.failedWorkUnitCount
} failed)`,
);
}
return `${lines.join('\n')}\n`;
}
/**
* Machine-readable rendering for coding agents: the full structured profile
* (raw milliseconds and token counts, stable keys) as a single JSON object
* under a stable marker line so it is easy to locate and parse in stderr.
*/
export function formatIngestProfileJson(profile: IngestProfile): string {
return `ktx ingest profile (json)\n${JSON.stringify(profile, null, 2)}\n`;
}
export type IngestProfileMode = 'off' | 'table' | 'json';
/**
* Resolve how (and whether) to emit the ingest profile, from the
* `ingest.profile` config value and the `KTX_PROFILE_INGEST` env var. Either
* source may request `json` (raw, agent-friendly) or a human `table`; `json`
* wins if either asks for it.
*/
export function resolveIngestProfileMode(
configValue: boolean | 'json' | undefined,
env: NodeJS.ProcessEnv = process.env,
): IngestProfileMode {
const envValue = env.KTX_PROFILE_INGEST;
if (configValue === 'json' || envValue === 'json') {
return 'json';
}
const wantsTable =
configValue === true || envValue === '1' || envValue === 'true' || envValue === 'table';
return wantsTable ? 'table' : 'off';
}

View file

@ -26,16 +26,52 @@ function patchFileName(unitIndex: number, unitKey: string): string {
export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Promise<WorkUnitOutcome> { export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Promise<WorkUnitOutcome> {
const sessionKey = `${input.trace.context.jobId}-${input.workUnit.unitKey}`; const sessionKey = `${input.trace.context.jobId}-${input.workUnit.unitKey}`;
let cleanupOutcome: SessionOutcome = 'crash'; let cleanupOutcome: SessionOutcome = 'crash';
const createStartedAt = Date.now();
const child = await input.sessionWorktreeService.create(sessionKey, input.ingestionBaseSha); const child = await input.sessionWorktreeService.create(sessionKey, input.ingestionBaseSha);
await input.trace.event('debug', 'work_unit', 'work_unit_child_created', { await input.trace.event(
unitKey: input.workUnit.unitKey, 'debug',
unitIndex: input.unitIndex, 'work_unit',
worktreePath: child.workdir, 'work_unit_child_created',
baseSha: input.ingestionBaseSha, {
}); unitKey: input.workUnit.unitKey,
unitIndex: input.unitIndex,
worktreePath: child.workdir,
baseSha: input.ingestionBaseSha,
},
undefined,
Date.now() - createStartedAt,
);
try { try {
const runStartedAt = Date.now();
const outcome = await input.run(child); const outcome = await input.run(child);
await input.trace.event(
'debug',
'work_unit',
'work_unit_executed',
{
unitKey: input.workUnit.unitKey,
unitIndex: input.unitIndex,
status: outcome.status,
...(outcome.metrics
? {
agentLoopMs: outcome.metrics.totalMs,
stepCount: outcome.metrics.stepCount,
...(outcome.metrics.usage.inputTokens !== undefined
? { inputTokens: outcome.metrics.usage.inputTokens }
: {}),
...(outcome.metrics.usage.outputTokens !== undefined
? { outputTokens: outcome.metrics.usage.outputTokens }
: {}),
...(outcome.metrics.usage.totalTokens !== undefined
? { totalTokens: outcome.metrics.usage.totalTokens }
: {}),
}
: {}),
},
undefined,
Date.now() - runStartedAt,
);
if (outcome.status !== 'success') { if (outcome.status !== 'success') {
cleanupOutcome = 'success'; cleanupOutcome = 'success';
await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', { await input.trace.event('error', 'work_unit', 'work_unit_failed_before_patch', {
@ -75,11 +111,19 @@ export async function runIsolatedWorkUnit(input: RunIsolatedWorkUnitInput): Prom
cleanupOutcome = 'success'; cleanupOutcome = 'success';
throw error; throw error;
} finally { } finally {
const cleanupStartedAt = Date.now();
await input.sessionWorktreeService.cleanup(child, cleanupOutcome); await input.sessionWorktreeService.cleanup(child, cleanupOutcome);
await input.trace.event('trace', 'work_unit', 'work_unit_child_cleanup', { await input.trace.event(
unitKey: input.workUnit.unitKey, 'trace',
outcome: cleanupOutcome, 'work_unit',
worktreePath: child.workdir, 'work_unit_child_cleanup',
}); {
unitKey: input.workUnit.unitKey,
outcome: cleanupOutcome,
worktreePath: child.workdir,
},
undefined,
Date.now() - cleanupStartedAt,
);
} }
} }

View file

@ -716,6 +716,7 @@ export function createLocalBundleIngestRuntime(
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget, workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode, workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
profileIngest: options.project.config.ingest.profile,
ingestTraceLevel: ingestTraceLevelFromEnv(), ingestTraceLevel: ingestTraceLevelFromEnv(),
}, },
skillsRegistry: new SkillsRegistryService({ skillsDir, logger }), skillsRegistry: new SkillsRegistryService({ skillsDir, logger }),

View file

@ -144,6 +144,8 @@ interface IngestSettingsPort {
workUnitMaxConcurrency?: number; workUnitMaxConcurrency?: number;
workUnitStepBudget?: number; workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue'; workUnitFailureMode?: 'abort' | 'continue';
/** Print a timing breakdown to stderr at the end of each run (config-driven; see also KTX_PROFILE_INGEST). `'json'` emits the raw structured profile. */
profileIngest?: boolean | 'json';
ingestTraceLevel?: IngestTraceLevel; ingestTraceLevel?: IngestTraceLevel;
} }

View file

@ -1,5 +1,5 @@
import type { KtxModelRole } from '../../../llm/types.js'; import type { KtxModelRole } from '../../../llm/types.js';
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js'; import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js'; import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js';
import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js'; import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js';
import type { WorkUnit } from '../types.js'; import type { WorkUnit } from '../types.js';
@ -44,6 +44,8 @@ export interface WorkUnitOutcome {
patchPath?: string; patchPath?: string;
patchTouchedPaths?: string[]; patchTouchedPaths?: string[];
childWorktreePath?: string; childWorktreePath?: string;
/** Timing and token metrics for the work-unit agent loop, used for ingest profiling. */
metrics?: RunLoopMetrics;
} }
export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit): Promise<WorkUnitOutcome> { export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit): Promise<WorkUnitOutcome> {
@ -125,6 +127,7 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
touchedSlSources: [], touchedSlSources: [],
slDisallowed: wu.slDisallowed, slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason, slDisallowedReason: wu.slDisallowedReason,
...(runResult.metrics ? { metrics: runResult.metrics } : {}),
}; };
}; };
@ -162,5 +165,6 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
touchedSlSources: touched, touchedSlSources: touched,
slDisallowed: wu.slDisallowed, slDisallowed: wu.slDisallowed,
slDisallowedReason: wu.slDisallowedReason, slDisallowedReason: wu.slDisallowedReason,
...(runResult.metrics ? { metrics: runResult.metrics } : {}),
}; };
} }

View file

@ -1,4 +1,4 @@
import type { AgentRunnerPort, KtxRuntimeToolSet } from '../../../context/llm/runtime-port.js'; import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { KtxModelRole } from '../../../llm/types.js'; import type { KtxModelRole } from '../../../llm/types.js';
import type { EvictionUnit } from '../types.js'; import type { EvictionUnit } from '../types.js';
import type { StageIndex } from './stage-index.types.js'; import type { StageIndex } from './stage-index.types.js';
@ -23,6 +23,7 @@ export interface ReconciliationOutcome {
skipped: boolean; skipped: boolean;
stopReason?: 'budget' | 'natural' | 'error'; stopReason?: 'budget' | 'natural' | 'error';
error?: Error; error?: Error;
metrics?: RunLoopMetrics;
} }
export async function runReconciliationStage4(ctx: ReconciliationContext): Promise<ReconciliationOutcome> { export async function runReconciliationStage4(ctx: ReconciliationContext): Promise<ReconciliationOutcome> {
@ -40,5 +41,5 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi
telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId }, telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId },
onStepFinish: ctx.onStepFinish, onStepFinish: ctx.onStepFinish,
}); });
return { skipped: false, stopReason: run.stopReason, error: run.error }; return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) };
} }

View file

@ -81,8 +81,13 @@ export function wrapToolsWithLogger<T extends KtxRuntimeToolSet>(
return wrapped as T; return wrapped as T;
} }
// Fire-and-forget appends are intentional (the agent hot path must never block
// or fail on logging), but readers like the ingest profiler need to know when
// the writes have settled. Track in-flight appends so a consumer can flush.
const pendingWrites = new Set<Promise<void>>();
function appendEntry(path: string, entry: ToolCallLogEntry): void { function appendEntry(path: string, entry: ToolCallLogEntry): void {
void (async () => { const write = (async () => {
try { try {
await mkdir(dirname(path), { recursive: true }); await mkdir(dirname(path), { recursive: true });
await appendFile(path, `${safeStringify(entry)}\n`, 'utf-8'); await appendFile(path, `${safeStringify(entry)}\n`, 'utf-8');
@ -90,6 +95,37 @@ function appendEntry(path: string, entry: ToolCallLogEntry): void {
// best-effort // best-effort
} }
})(); })();
pendingWrites.add(write);
void write.finally(() => pendingWrites.delete(write));
}
/**
* Await all in-flight tool-call log writes (best-effort, bounded by `timeoutMs`
* so it can never hang a caller). Lets readers such as the ingest profiler see
* complete transcripts despite the fire-and-forget append design.
*/
export async function flushToolCallLogs(timeoutMs = 5000): Promise<void> {
const pending = [...pendingWrites];
if (pending.length === 0) {
return;
}
const settled = Promise.allSettled(pending).then(() => undefined);
if (timeoutMs <= 0) {
await settled;
return;
}
let timer: ReturnType<typeof setTimeout> | undefined;
const timeout = new Promise<void>((resolve) => {
timer = setTimeout(resolve, timeoutMs);
timer.unref?.();
});
try {
await Promise.race([settled, timeout]);
} finally {
if (timer) {
clearTimeout(timer);
}
}
} }
function safeStringify(v: unknown): string { function safeStringify(v: unknown): string {

View file

@ -9,6 +9,7 @@ import type {
KtxGenerateObjectInput, KtxGenerateObjectInput,
KtxGenerateTextInput, KtxGenerateTextInput,
KtxLlmRuntimePort, KtxLlmRuntimePort,
LlmTokenUsage,
RunLoopParams, RunLoopParams,
RunLoopResult, RunLoopResult,
} from './runtime-port.js'; } from './runtime-port.js';
@ -17,6 +18,23 @@ interface AgentTelemetryPort {
createTelemetry(tags: Record<string, string>): TelemetrySettings; createTelemetry(tags: Record<string, string>): TelemetrySettings;
} }
interface MaybeUsage {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
}
function toLlmTokenUsage(usage: MaybeUsage | undefined): LlmTokenUsage {
if (!usage) {
return {};
}
return {
...(usage.inputTokens !== undefined ? { inputTokens: usage.inputTokens } : {}),
...(usage.outputTokens !== undefined ? { outputTokens: usage.outputTokens } : {}),
...(usage.totalTokens !== undefined ? { totalTokens: usage.totalTokens } : {}),
};
}
export interface AiSdkKtxLlmRuntimeDeps { export interface AiSdkKtxLlmRuntimeDeps {
llmProvider: KtxLlmProvider; llmProvider: KtxLlmProvider;
telemetry?: AgentTelemetryPort; telemetry?: AgentTelemetryPort;
@ -48,6 +66,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
model, model,
}); });
const split = splitKtxSystemMessages(built.messages); const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({ const result = await generateText({
model, model,
temperature: input.temperature ?? 0, temperature: input.temperature ?? 0,
@ -62,6 +81,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
} }
: {}), : {}),
}); });
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (typeof result.text !== 'string') { if (typeof result.text !== 'string') {
throw new Error('KTX LLM text generation returned no text'); throw new Error('KTX LLM text generation returned no text');
} }
@ -80,6 +100,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
model, model,
}); });
const split = splitKtxSystemMessages(built.messages); const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({ const result = await generateText({
model, model,
temperature: input.temperature ?? 0, temperature: input.temperature ?? 0,
@ -95,6 +116,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
: {}), : {}),
output: Output.object({ schema: input.schema as unknown as FlexibleSchema<TOutput> }), output: Output.object({ schema: input.schema as unknown as FlexibleSchema<TOutput> }),
}); });
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (result.output == null) { if (result.output == null) {
throw new Error('KTX LLM object generation returned no output'); throw new Error('KTX LLM object generation returned no output');
} }
@ -103,6 +125,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> { async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
let stepIndex = 0; let stepIndex = 0;
const startedAt = Date.now();
const stepBoundariesMs: number[] = [];
try { try {
const model = this.deps.llmProvider.getModel(params.modelRole); const model = this.deps.llmProvider.getModel(params.modelRole);
const tools = createAiSdkToolSet(params.toolSet); const tools = createAiSdkToolSet(params.toolSet);
@ -128,7 +152,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}), }),
); );
await generateText({ const result = await generateText({
model, model,
temperature: 0, temperature: 0,
stopWhen: stepCountIs(params.stepBudget), stopWhen: stepCountIs(params.stepBudget),
@ -141,6 +165,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
tools: built.tools as ToolSet, tools: built.tools as ToolSet,
onStepFinish: async () => { onStepFinish: async () => {
stepIndex += 1; stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) { if (!params.onStepFinish) {
return; return;
} }
@ -155,11 +180,23 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
} }
}, },
}); });
return { stopReason: 'natural' }; return {
stopReason: 'natural',
metrics: {
totalMs: Date.now() - startedAt,
stepCount: stepIndex,
stepBoundariesMs,
usage: toLlmTokenUsage(result.totalUsage ?? result.usage),
},
};
} catch (error) { } catch (error) {
const err = error instanceof Error ? error : new Error(String(error)); const err = error instanceof Error ? error : new Error(String(error));
this.logger.warn(`[agent-runner] loop failed: ${err.message}`); this.logger.warn(`[agent-runner] loop failed: ${err.message}`);
return { stopReason: 'error', error: err }; return {
stopReason: 'error',
error: err,
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} },
};
} }
} }
} }

View file

@ -15,6 +15,7 @@ import type {
KtxGenerateTextInput, KtxGenerateTextInput,
KtxLlmRuntimePort, KtxLlmRuntimePort,
KtxRuntimeToolSet, KtxRuntimeToolSet,
LlmTokenUsage,
RunLoopParams, RunLoopParams,
RunLoopResult, RunLoopResult,
RunLoopStopReason, RunLoopStopReason,
@ -22,6 +23,20 @@ import type {
type QueryFn = (params: Parameters<typeof defaultQuery>[0]) => AsyncIterable<SDKMessage>; type QueryFn = (params: Parameters<typeof defaultQuery>[0]) => AsyncIterable<SDKMessage>;
function claudeTokenUsage(result: SDKResultMessage): LlmTokenUsage {
const usage = (result as { usage?: { input_tokens?: number; output_tokens?: number } }).usage;
if (!usage) {
return {};
}
const { input_tokens: inputTokens, output_tokens: outputTokens } = usage;
const totalTokens = inputTokens !== undefined && outputTokens !== undefined ? inputTokens + outputTokens : undefined;
return {
...(inputTokens !== undefined ? { inputTokens } : {}),
...(outputTokens !== undefined ? { outputTokens } : {}),
...(totalTokens !== undefined ? { totalTokens } : {}),
};
}
export interface ClaudeCodeKtxLlmRuntimeDeps { export interface ClaudeCodeKtxLlmRuntimeDeps {
projectDir: string; projectDir: string;
modelSlots: { default: string } & Partial<Record<string, string>>; modelSlots: { default: string } & Partial<Record<string, string>>;
@ -236,6 +251,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
maxTurns: 1, maxTurns: 1,
tools: input.tools, tools: input.tools,
}); });
const startedAt = Date.now();
const result = await collectResult({ const result = await collectResult({
query: this.runQuery, query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'), prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
@ -243,6 +259,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
allowedToolIds: new Set(mcpToolIds(input.tools ?? {})), allowedToolIds: new Set(mcpToolIds(input.tools ?? {})),
expectedMcpServerNames: expectedMcpServerNames(input.tools), expectedMcpServerNames: expectedMcpServerNames(input.tools),
}); });
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result); const error = resultError(result);
if (error) { if (error) {
throw error; throw error;
@ -271,6 +288,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
}), }),
outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) }, outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) },
}; };
const startedAt = Date.now();
const result = await collectResult({ const result = await collectResult({
query: this.runQuery, query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'), prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
@ -278,6 +296,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]), allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]),
expectedMcpServerNames: expectedMcpServerNames(input.tools), expectedMcpServerNames: expectedMcpServerNames(input.tools),
}); });
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result); const error = resultError(result);
if (error) { if (error) {
throw error; throw error;
@ -290,6 +309,8 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> { async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
let stepIndex = 0; let stepIndex = 0;
const startedAt = Date.now();
const stepBoundariesMs: number[] = [];
try { try {
const options = baseOptions({ const options = baseOptions({
projectDir: this.deps.projectDir, projectDir: this.deps.projectDir,
@ -306,6 +327,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
expectedMcpServerNames: expectedMcpServerNames(params.toolSet), expectedMcpServerNames: expectedMcpServerNames(params.toolSet),
onAssistantTurn: async () => { onAssistantTurn: async () => {
stepIndex += 1; stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) { if (!params.onStepFinish) {
return; return;
} }
@ -322,10 +344,23 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
}); });
const stopReason = mapClaudeCodeStopReason(result); const stopReason = mapClaudeCodeStopReason(result);
const error = resultError(result); const error = resultError(result);
return { stopReason, ...(stopReason === 'error' && error ? { error } : {}) }; return {
stopReason,
...(stopReason === 'error' && error ? { error } : {}),
metrics: {
totalMs: Date.now() - startedAt,
stepCount: stepIndex,
stepBoundariesMs,
usage: claudeTokenUsage(result),
},
};
} catch (error) { } catch (error) {
const err = error instanceof Error ? error : new Error(String(error)); const err = error instanceof Error ? error : new Error(String(error));
return { stopReason: 'error', error: err }; return {
stopReason: 'error',
error: err,
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} },
};
} }
} }
} }

View file

@ -23,6 +23,24 @@ export interface RunLoopStepInfo {
stepBudget: number; stepBudget: number;
} }
export interface LlmTokenUsage {
inputTokens?: number;
outputTokens?: number;
totalTokens?: number;
}
/** Timing and token metrics for a multi-step agent loop, used for ingest profiling. */
export interface RunLoopMetrics {
/** Wall-clock time around the whole `generateText` call, in milliseconds. */
totalMs: number;
/** Aggregate token usage across all steps. */
usage: LlmTokenUsage;
/** Number of agent steps (model round-trips) that actually ran. */
stepCount: number;
/** Wall-clock offset (ms from loop start) at which each step finished. */
stepBoundariesMs: number[];
}
export interface RunLoopParams { export interface RunLoopParams {
modelRole: KtxModelRole; modelRole: KtxModelRole;
systemPrompt: string; systemPrompt: string;
@ -36,6 +54,7 @@ export interface RunLoopParams {
export interface RunLoopResult { export interface RunLoopResult {
stopReason: RunLoopStopReason; stopReason: RunLoopStopReason;
error?: Error; error?: Error;
metrics?: RunLoopMetrics;
} }
export interface KtxGenerateTextInput { export interface KtxGenerateTextInput {
@ -44,6 +63,7 @@ export interface KtxGenerateTextInput {
system?: string; system?: string;
tools?: KtxRuntimeToolSet; tools?: KtxRuntimeToolSet;
temperature?: number; temperature?: number;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
} }
export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutput>> { export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutput>> {
@ -53,6 +73,7 @@ export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutp
tools?: KtxRuntimeToolSet; tools?: KtxRuntimeToolSet;
temperature?: number; temperature?: number;
schema: TSchema; schema: TSchema;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
} }
export interface KtxLlmRuntimePort { export interface KtxLlmRuntimePort {

View file

@ -110,6 +110,12 @@ const ingestSchema = z
.prefault({ backend: 'none' }) .prefault({ backend: 'none' })
.describe('Embedding configuration used when ingest adapters need to embed documents.'), .describe('Embedding configuration used when ingest adapters need to embed documents.'),
workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'), workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'),
profile: z
.union([z.boolean(), z.literal('json')])
.default(false)
.describe(
'Print a timing breakdown to stderr at the end of each ingest run. `true` prints a human table; `"json"` prints the raw structured profile for coding agents; `false` disables it. Equivalent to the KTX_PROFILE_INGEST environment variable (`1`/`true`/`json`).',
),
}) })
.describe('Ingest pipeline configuration: adapters, embeddings, and work-unit policy.'); .describe('Ingest pipeline configuration: adapters, embeddings, and work-unit policy.');

View file

@ -0,0 +1,247 @@
import { mkdir, mkdtemp, writeFile } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, describe, expect, it } from 'vitest';
import {
aggregateIngestProfile,
formatIngestProfile,
formatIngestProfileJson,
type IngestProfilePaths,
parseTraceEvents,
readIngestProfile,
resolveIngestProfileMode,
type ProfiledTraceEvent,
} from '../../../src/context/ingest/ingest-profile.js';
import { rm } from 'node:fs/promises';
function profilePaths(projectDir: string, jobId: string): IngestProfilePaths {
return {
tracePath: join(projectDir, '.ktx', 'ingest-traces', jobId, 'trace.jsonl'),
transcriptDir: join(projectDir, '.ktx', 'ingest-transcripts', jobId),
};
}
function traceLine(event: Partial<ProfiledTraceEvent> & { phase: string; event: string }): string {
return JSON.stringify({ schemaVersion: 1, level: 'debug', ...event });
}
describe('parseTraceEvents', () => {
it('parses valid JSONL lines and skips blank and malformed ones', () => {
const text = [
traceLine({ at: '2026-05-30T00:00:00.000Z', phase: 'fetch', event: 'fetch_finished', durationMs: 100 }),
'',
'{ not json',
traceLine({ phase: 'diff', event: 'compute_diff_set_finished', durationMs: 5 }),
].join('\n');
const events = parseTraceEvents(text);
expect(events).toHaveLength(2);
expect(events[0].phase).toBe('fetch');
expect(events[1].event).toBe('compute_diff_set_finished');
});
});
describe('aggregateIngestProfile', () => {
it('sums durations per phase and sorts by total descending', () => {
const events = parseTraceEvents(
[
traceLine({ phase: 'fetch', event: 'fetch_finished', durationMs: 1000 }),
traceLine({ phase: 'work_unit', event: 'work_unit_executed', durationMs: 5000, data: { unitKey: 'a' } }),
traceLine({ phase: 'work_unit', event: 'work_unit_executed', durationMs: 3000, data: { unitKey: 'b' } }),
traceLine({ phase: 'diff', event: 'compute_diff_set_finished', durationMs: 50 }),
].join('\n'),
);
const profile = aggregateIngestProfile({ jobId: 'job-1', events, toolMsByUnit: {} });
expect(profile.phases.map((p) => p.phase)).toEqual(['work_unit', 'fetch', 'diff']);
expect(profile.phases[0]).toEqual({ phase: 'work_unit', totalMs: 8000, count: 2 });
});
it('builds per-work-unit rows and derives model time from agent loop minus tool time', () => {
const events = parseTraceEvents(
[
traceLine({
phase: 'work_unit',
event: 'work_unit_child_created',
durationMs: 200,
data: { unitKey: 'cards/users' },
}),
traceLine({
phase: 'work_unit',
event: 'work_unit_executed',
durationMs: 12000,
data: { unitKey: 'cards/users', status: 'success', agentLoopMs: 10000, stepCount: 12, totalTokens: 48000 },
}),
traceLine({
phase: 'work_unit',
event: 'work_unit_child_cleanup',
durationMs: 80,
data: { unitKey: 'cards/users' },
}),
].join('\n'),
);
const profile = aggregateIngestProfile({ jobId: 'job-1', events, toolMsByUnit: { 'cards/users': 2500 } });
expect(profile.workUnitCount).toBe(1);
const wu = profile.workUnits[0];
expect(wu).toMatchObject({
unitKey: 'cards/users',
status: 'success',
totalMs: 12000,
agentLoopMs: 10000,
toolMs: 2500,
modelMs: 7500,
createMs: 200,
cleanupMs: 80,
stepCount: 12,
totalTokens: 48000,
});
});
it('counts failed work units and tolerates missing tool transcripts', () => {
const events = parseTraceEvents(
[
traceLine({
phase: 'work_unit',
event: 'work_unit_executed',
durationMs: 4000,
data: { unitKey: 'wu-ok', status: 'success', agentLoopMs: 3800 },
}),
traceLine({
phase: 'work_unit',
event: 'work_unit_executed',
durationMs: 1000,
data: { unitKey: 'wu-bad', status: 'failed', agentLoopMs: 900 },
}),
].join('\n'),
);
const profile = aggregateIngestProfile({ jobId: 'job-1', events, toolMsByUnit: {} });
expect(profile.failedWorkUnitCount).toBe(1);
// No tool transcript → model time falls back to the full agent-loop time.
expect(profile.workUnits.find((w) => w.unitKey === 'wu-ok')?.modelMs).toBe(3800);
});
it('derives total wall time from the first and last event timestamps', () => {
const events = parseTraceEvents(
[
traceLine({ at: '2026-05-30T00:00:00.000Z', phase: 'fetch', event: 'fetch_started' }),
traceLine({ at: '2026-05-30T00:01:30.000Z', phase: 'run', event: 'ingest_finished' }),
].join('\n'),
);
const profile = aggregateIngestProfile({ jobId: 'job-1', events, toolMsByUnit: {} });
expect(profile.totalWallMs).toBe(90_000);
});
});
describe('formatIngestProfile', () => {
it('renders phase breakdown and work-unit rows', () => {
const events = parseTraceEvents(
[
traceLine({ at: '2026-05-30T00:00:00.000Z', phase: 'work_unit', event: 'work_unit_executed', durationMs: 8000, data: { unitKey: 'cards/users', status: 'success', agentLoopMs: 8000, stepCount: 10, totalTokens: 12000 } }),
traceLine({ at: '2026-05-30T00:00:10.000Z', phase: 'reconciliation', event: 'reconciliation_executed', durationMs: 2000 }),
].join('\n'),
);
const profile = aggregateIngestProfile({ jobId: 'job-xyz', events, toolMsByUnit: { 'cards/users': 1000 } });
const text = formatIngestProfile(profile);
expect(text).toContain('job-xyz');
expect(text).toContain('Phase breakdown');
expect(text).toContain('work_unit');
expect(text).toContain('reconciliation');
expect(text).toContain('cards/users');
expect(text).toContain('success');
});
});
describe('readIngestProfile', () => {
const created: string[] = [];
afterEach(async () => {
for (const dir of created.splice(0)) {
await rm(dir, { recursive: true, force: true });
}
});
it('joins nested tool transcripts to work units by wuKey', async () => {
const projectDir = await mkdtemp(join(tmpdir(), 'ktx-profile-'));
created.push(projectDir);
const jobId = 'job-nested';
const paths = profilePaths(projectDir, jobId);
await mkdir(join(paths.transcriptDir, 'cards'), { recursive: true });
await mkdir(join(paths.tracePath, '..'), { recursive: true });
await writeFile(
paths.tracePath,
[
JSON.stringify({
phase: 'work_unit',
event: 'work_unit_executed',
durationMs: 10000,
data: { unitKey: 'cards/marketing', status: 'success', agentLoopMs: 9000, stepCount: 12 },
}),
].join('\n'),
'utf-8',
);
// Work-unit key has a slash → transcript lives at cards/marketing.jsonl.
await writeFile(
join(paths.transcriptDir, 'cards', 'marketing.jsonl'),
[
JSON.stringify({ wuKey: 'cards/marketing', toolName: 'sl_write', durationMs: 2000, input: {} }),
JSON.stringify({ wuKey: 'cards/marketing', toolName: 'sl_validate', durationMs: 1000, input: {} }),
].join('\n'),
'utf-8',
);
const profile = await readIngestProfile(jobId, paths);
const wu = profile.workUnits.find((entry) => entry.unitKey === 'cards/marketing');
expect(wu?.toolMs).toBe(3000);
expect(wu?.modelMs).toBe(6000);
});
});
describe('resolveIngestProfileMode', () => {
it('reads the table/json/off mode from the env var', () => {
expect(resolveIngestProfileMode(undefined, { KTX_PROFILE_INGEST: '1' })).toBe('table');
expect(resolveIngestProfileMode(undefined, { KTX_PROFILE_INGEST: 'true' })).toBe('table');
expect(resolveIngestProfileMode(undefined, { KTX_PROFILE_INGEST: 'json' })).toBe('json');
expect(resolveIngestProfileMode(undefined, { KTX_PROFILE_INGEST: '0' })).toBe('off');
expect(resolveIngestProfileMode(undefined, {})).toBe('off');
});
it('reads the mode from the config value', () => {
expect(resolveIngestProfileMode(true, {})).toBe('table');
expect(resolveIngestProfileMode('json', {})).toBe('json');
expect(resolveIngestProfileMode(false, {})).toBe('off');
});
it('lets either source request json (json wins)', () => {
expect(resolveIngestProfileMode(true, { KTX_PROFILE_INGEST: 'json' })).toBe('json');
expect(resolveIngestProfileMode('json', { KTX_PROFILE_INGEST: '1' })).toBe('json');
});
});
describe('summary and JSON output', () => {
function profileWithReconcileDominant() {
const events = parseTraceEvents(
[
traceLine({ at: '2026-05-30T00:00:00.000Z', phase: 'work_unit', event: 'work_unit_executed', durationMs: 10000, data: { unitKey: 'a', status: 'success', agentLoopMs: 10000, stepCount: 12, totalTokens: 40000 } }),
traceLine({ at: '2026-05-30T00:01:40.000Z', phase: 'reconciliation', event: 'reconciliation_executed', durationMs: 90000 }),
].join('\n'),
);
return aggregateIngestProfile({ jobId: 'job-sum', events, toolMsByUnit: { a: 2000 } });
}
it('produces a headline naming the dominant phase and the model/tool split', () => {
const profile = profileWithReconcileDominant();
expect(profile.summary.dominantPhase?.phase).toBe('reconciliation');
expect(profile.summary.workUnits).toMatchObject({ count: 1, agentLoopMs: 10000, toolMs: 2000, modelMs: 8000, modelPct: 80 });
expect(profile.summary.headline).toContain('reconciliation');
expect(profile.summary.headline).toContain('80%');
});
it('emits raw structured JSON with stable keys for agents', () => {
const profile = profileWithReconcileDominant();
const text = formatIngestProfileJson(profile);
expect(text).toContain('ktx ingest profile (json)');
const json = JSON.parse(text.slice(text.indexOf('{')));
expect(json.jobId).toBe('job-sum');
expect(json.summary.headline).toEqual(expect.any(String));
// Raw milliseconds, not human-formatted strings.
expect(json.workUnits[0].agentLoopMs).toBe(10000);
expect(json.phases[0].totalMs).toBe(90000);
});
});

View file

@ -299,6 +299,7 @@ describe('createLocalBundleIngestRuntime', () => {
'ingestTraceLevel', 'ingestTraceLevel',
'memoryIngestionModel', 'memoryIngestionModel',
'probeRowCount', 'probeRowCount',
'profileIngest',
'workUnitFailureMode', 'workUnitFailureMode',
'workUnitMaxConcurrency', 'workUnitMaxConcurrency',
'workUnitStepBudget', 'workUnitStepBudget',

View file

@ -0,0 +1,54 @@
import { mkdtemp, readFile, rm } from 'node:fs/promises';
import { tmpdir } from 'node:os';
import { join } from 'node:path';
import { afterEach, describe, expect, it } from 'vitest';
import { z } from 'zod';
import { flushToolCallLogs, wrapToolsWithLogger } from '../../../../src/context/ingest/tools/tool-call-logger.js';
describe('wrapToolsWithLogger + flushToolCallLogs', () => {
const dirs: string[] = [];
afterEach(async () => {
for (const dir of dirs.splice(0)) {
await rm(dir, { recursive: true, force: true });
}
});
function toolset() {
return {
my_tool: {
name: 'my_tool',
description: 'test tool',
inputSchema: z.object({}),
execute: async (_input: unknown) => ({ markdown: 'ok' }),
},
};
}
it('makes the fire-and-forget transcript write observable after a flush', async () => {
const dir = await mkdtemp(join(tmpdir(), 'ktx-toollog-'));
dirs.push(dir);
const logPath = join(dir, 'wu.jsonl');
const wrapped = wrapToolsWithLogger(toolset(), logPath, 'cards/users');
await wrapped.my_tool.execute({});
// The append is fire-and-forget; flushing must guarantee it has landed.
await flushToolCallLogs();
const entry = JSON.parse((await readFile(logPath, 'utf-8')).trim());
expect(entry.wuKey).toBe('cards/users');
expect(entry.toolName).toBe('my_tool');
expect(typeof entry.durationMs).toBe('number');
});
it('resolves immediately when there is nothing to flush', async () => {
await expect(flushToolCallLogs()).resolves.toBeUndefined();
});
it('is bounded by its timeout and never rejects', async () => {
const dir = await mkdtemp(join(tmpdir(), 'ktx-toollog-'));
dirs.push(dir);
const wrapped = wrapToolsWithLogger(toolset(), join(dir, 'wu.jsonl'), 'wu/1');
await wrapped.my_tool.execute({});
await expect(flushToolCallLogs(0)).resolves.toBeUndefined();
});
});

View file

@ -107,6 +107,73 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
expect(result.error).toBe(err); expect(result.error).toBe(err);
}); });
it('returns metrics with stepCount, per-step boundaries, and aggregate token usage', async () => {
(generateText as any).mockImplementation(async (opts: any) => {
await opts.onStepFinish({});
await opts.onStepFinish({});
return {
text: 'ok',
toolCalls: [],
steps: [],
totalUsage: { inputTokens: 100, outputTokens: 20, totalTokens: 120 },
};
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.metrics).toBeDefined();
expect(result.metrics?.stepCount).toBe(2);
expect(result.metrics?.stepBoundariesMs).toHaveLength(2);
expect(result.metrics?.totalMs).toBeGreaterThanOrEqual(0);
expect(result.metrics?.usage).toEqual({ inputTokens: 100, outputTokens: 20, totalTokens: 120 });
});
it('falls back to result.usage when totalUsage is absent', async () => {
(generateText as any).mockResolvedValue({
text: 'ok',
toolCalls: [],
steps: [],
usage: { inputTokens: 7, outputTokens: 3, totalTokens: 10 },
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.metrics?.usage).toEqual({ inputTokens: 7, outputTokens: 3, totalTokens: 10 });
expect(result.metrics?.stepCount).toBe(0);
});
it('returns partial metrics even when the loop errors', async () => {
(generateText as any).mockRejectedValue(new Error('boom'));
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('error');
expect(result.metrics).toBeDefined();
expect(result.metrics?.stepCount).toBe(0);
expect(result.metrics?.usage).toEqual({});
});
it('invokes caller onStepFinish with incrementing stepIndex and total budget', async () => { it('invokes caller onStepFinish with incrementing stepIndex and total budget', async () => {
const calls: RunLoopStepInfo[] = []; const calls: RunLoopStepInfo[] = [];
(generateText as any).mockImplementation(async (opts: any) => { (generateText as any).mockImplementation(async (opts: any) => {

View file

@ -284,7 +284,7 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
stepBudget: 1, stepBudget: 1,
telemetryTags: { operationName: 'test' }, telemetryTags: { operationName: 'test' },
}), }),
).resolves.toEqual({ stopReason: 'budget' }); ).resolves.toMatchObject({ stopReason: 'budget' });
const options = query.mock.calls[0][0].options; const options = query.mock.calls[0][0].options;
expect(options.allowedTools).toEqual(['mcp__ktx__load_skill']); expect(options.allowedTools).toEqual(['mcp__ktx__load_skill']);
@ -467,7 +467,7 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
telemetryTags: { operationName: 'test' }, telemetryTags: { operationName: 'test' },
onStepFinish, onStepFinish,
}), }),
).resolves.toEqual({ stopReason: 'natural' }); ).resolves.toMatchObject({ stopReason: 'natural' });
expect(onStepFinish).toHaveBeenCalledTimes(1); expect(onStepFinish).toHaveBeenCalledTimes(1);
expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 40 }); expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 40 });
@ -513,7 +513,7 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
throw new Error('callback exploded'); throw new Error('callback exploded');
}, },
}), }),
).resolves.toEqual({ stopReason: 'natural' }); ).resolves.toMatchObject({ stopReason: 'natural' });
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('callback exploded')); expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('callback exploded'));
}); });
@ -525,6 +525,45 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_during_execution' }))).toBe('error'); expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_during_execution' }))).toBe('error');
}); });
it('returns loop metrics including step count and mapped token usage', async () => {
const query = vi.fn((_input: any) =>
stream([
initMessage(),
{
type: 'assistant',
message: { role: 'assistant', content: [] },
parent_tool_use_id: null,
uuid: '00000000-0000-4000-8000-000000000006',
session_id: 'session-id',
} as unknown as SDKMessage,
resultMessage({
subtype: 'success',
terminal_reason: 'completed',
usage: { input_tokens: 50, output_tokens: 10 } as never,
}),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
});
const result = await runtime.runAgentLoop({
modelRole: 'default',
systemPrompt: 'system',
userPrompt: 'user',
toolSet: {},
stepBudget: 40,
telemetryTags: { operationName: 'test' },
});
expect(result.metrics?.stepCount).toBe(1);
expect(result.metrics?.stepBoundariesMs).toHaveLength(1);
expect(result.metrics?.usage).toEqual({ inputTokens: 50, outputTokens: 10, totalTokens: 60 });
});
it('auth probe uses isolation options and a scrubbed env', async () => { it('auth probe uses isolation options and a scrubbed env', async () => {
const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ result: 'ok' })])); const query = vi.fn((_input: any) => stream([initMessage(), resultMessage({ result: 'ok' })]));

View file

@ -50,6 +50,7 @@ connections:
maxConcurrency: 1, maxConcurrency: 1,
failureMode: 'continue', failureMode: 'continue',
}, },
profile: false,
}, },
agent: { agent: {
run_research: { run_research: {
@ -156,6 +157,12 @@ ingest:
}); });
}); });
it('parses the ingest.profile flag (false default, true, or "json")', () => {
expect(parseKtxProjectConfig('ingest:\n adapters: []\n').ingest.profile).toBe(false);
expect(parseKtxProjectConfig('ingest:\n profile: true\n').ingest.profile).toBe(true);
expect(parseKtxProjectConfig('ingest:\n profile: json\n').ingest.profile).toBe('json');
});
it('parses global Vertex LLM config', () => { it('parses global Vertex LLM config', () => {
const config = parseKtxProjectConfig(` const config = parseKtxProjectConfig(`
llm: llm:

View file

@ -34,6 +34,7 @@ describe('buildProjectStackSnapshotFields', () => {
adapters: [], adapters: [],
embeddings: { backend: 'sentence-transformers', dimensions: 384 }, embeddings: { backend: 'sentence-transformers', dimensions: 384 },
workUnits: { stepBudget: 40, maxConcurrency: 1, failureMode: 'continue' }, workUnits: { stepBudget: 40, maxConcurrency: 1, failureMode: 'continue' },
profile: false,
}, },
llm: { provider: { backend: 'none' }, models: {}, promptCaching: {} }, llm: { provider: { backend: 'none' }, models: {}, promptCaching: {} },
scan: { scan: {