diff --git a/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts b/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts index 7848fab7..fbeab08c 100644 --- a/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts +++ b/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts @@ -39,7 +39,6 @@ export interface CuratorPaginationInput { buildUserPrompt: (input: CuratorPaginationPromptInput) => string; buildToolSet: (passNumber: number) => KtxRuntimeToolSet; getReconciliationActions: () => MemoryAction[]; - onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void; abortSignal?: AbortSignal; } @@ -245,10 +244,6 @@ export class CuratorPaginationService implements CuratorPaginationPort { jobId: params.input.jobId, forceRun: params.forceRun, abortSignal: params.input.abortSignal, - onStepFinish: params.input.onStepFinish - ? ({ stepIndex, stepBudget }) => - params.input.onStepFinish?.({ passNumber: params.passNumber, stepIndex, stepBudget }) - : undefined, }); } diff --git a/packages/cli/src/context/ingest/ingest-bundle.runner.ts b/packages/cli/src/context/ingest/ingest-bundle.runner.ts index a242d58a..6f5372d2 100644 --- a/packages/cli/src/context/ingest/ingest-bundle.runner.ts +++ b/packages/cli/src/context/ingest/ingest-bundle.runner.ts @@ -939,14 +939,13 @@ export class IngestBundleRunner { workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' }; transcriptDir: string; transcriptSummaries: Map; - recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => void; + recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => MutableToolTranscriptSummary; stageIndex: StageIndex; includeContextEvidenceTools: boolean; currentTableExists(tableRef: string): Promise; memoryFlow?: MemoryFlowEventSink; abortSignal?: AbortSignal; wuSkillNames: string[]; - onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; }): Promise { const session: CaptureSession = { userId: 'system', @@ -1050,7 +1049,6 @@ export class IngestBundleRunner { type: 'work_unit_started', unitKey: input.wu.unitKey, skills: input.wuSkillNames, - stepBudget: input.workUnitSettings.stepBudget, }); return executeWorkUnit( { @@ -1074,8 +1072,10 @@ export class IngestBundleRunner { slIndex: input.slIndex, priorProvenance: input.priorProvenance, }), - buildToolSet: (wuInner) => - wrapToolsWithLogger( + buildToolSet: (wuInner) => { + const transcriptPath = join(input.transcriptDir, `${wuInner.unitKey}.jsonl`); + const record = input.recordTranscriptEntry(transcriptPath); + return wrapToolsWithLogger( buildWuToolSet({ sourceKey: input.job.sourceKey, stagedDir: input.stagedDir, @@ -1084,10 +1084,23 @@ export class IngestBundleRunner { emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool, toolsetTools: wuToolset.toRuntimeTools(wuToolContext), }), - join(input.transcriptDir, `${wuInner.unitKey}.jsonl`), + transcriptPath, wuInner.unitKey, - { onEntry: input.recordTranscriptEntry(join(input.transcriptDir, `${wuInner.unitKey}.jsonl`)) }, - ), + { + // Drive the live HUD heartbeat from real tool calls: each invocation + // ticks the running per-unit count. This is an observed signal, not a + // re-derived turn count, so it can never overshoot a budget. + onEntry: (entry) => { + const summary = record(entry); + input.memoryFlow?.emit({ + type: 'work_unit_step', + unitKey: wuInner.unitKey, + toolCalls: summary.toolCallCount, + }); + }, + }, + ); + }, captureSession: session, sessionActions, modelRole: 'candidateExtraction', @@ -1096,7 +1109,6 @@ export class IngestBundleRunner { connectionId: input.job.connectionId, jobId: input.job.jobId, toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0, - onStepFinish: input.onStepFinish, abortSignal: input.abortSignal, }, input.wu, @@ -1166,11 +1178,12 @@ export class IngestBundleRunner { const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId); const recordTranscriptEntry = (path: string) => - (entry: ToolCallLogEntry): void => { + (entry: ToolCallLogEntry): MutableToolTranscriptSummary => { const current = transcriptSummaries.get(entry.wuKey) ?? createMutableToolTranscriptSummary(entry.wuKey, path); recordToolTranscriptEntry(current, entry); transcriptSummaries.set(entry.wuKey, current); + return current; }; const overrideReport = await this.loadOverrideReport(job); @@ -1639,9 +1652,6 @@ export class IngestBundleRunner { abortSignal: ctx?.abortSignal, memoryFlow, wuSkillNames, - onStepFinish: ({ stepIndex, stepBudget }) => { - memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget }); - }, }); }, }); @@ -2013,6 +2023,45 @@ export class IngestBundleRunner { let curatorWarnings: string[] = []; let reconcileOutcome: Awaited>; + // Reconcile shares the work-unit liveness model: the HUD heartbeat is driven + // by real tool calls (a monotonic, observed count), not a re-derived turn + // counter. The soft cap only paces the phase progress bar; it is never shown + // to the user, so it cannot read as a misleading "X/Y" fraction. + const reconcileTranscriptPath = join(transcriptDir, 'reconcile.jsonl'); + const reconcileProgressSoftCap = 40; + const buildReconcileToolSetWithHeartbeat = (): KtxRuntimeToolSet => { + const record = recordTranscriptEntry(reconcileTranscriptPath); + return wrapToolsWithLogger( + buildReconcileToolSet({ + loadSkillTool: rcLoadSkill, + stageListTool: rcStageListTool, + stageDiffTool: rcStageDiffTool, + evictionListTool: rcEvictionListTool, + emitConflictResolutionTool: rcEmitConflictResolutionTool, + emitEvictionDecisionTool: rcEmitEvictionDecisionTool, + emitArtifactResolutionTool: rcEmitArtifactResolutionTool, + emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool, + readRawSpanTool: rcRawSpanTool, + toolsetTools: rcToolset.toRuntimeTools(rcToolContext), + }), + reconcileTranscriptPath, + 'reconcile', + { + onEntry: (entry) => { + const summary = record(entry); + if (!stage4) { + return; + } + const label = `Reconciling results · ${summary.toolCallCount} action${ + summary.toolCallCount === 1 ? '' : 's' + }`; + emitStageProgress('reconciliation', 85, label, { transient: true }); + void stage4.updateProgress(Math.min(0.95, summary.toolCallCount / reconcileProgressSoftCap), label); + }, + }, + ); + }; + const reconcileStartedAt = Date.now(); const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single'; if (contextReport && this.deps.curatorPagination) { @@ -2035,39 +2084,8 @@ export class IngestBundleRunner { }), buildUserPrompt: ({ summary, items, runState }) => buildReconcileUserPrompt(stageIndex, eviction, { summary, items }, reconcileNotes, runState), - buildToolSet: (_passNumber) => - wrapToolsWithLogger( - buildReconcileToolSet({ - loadSkillTool: rcLoadSkill, - stageListTool: rcStageListTool, - stageDiffTool: rcStageDiffTool, - evictionListTool: rcEvictionListTool, - emitConflictResolutionTool: rcEmitConflictResolutionTool, - emitEvictionDecisionTool: rcEmitEvictionDecisionTool, - emitArtifactResolutionTool: rcEmitArtifactResolutionTool, - emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool, - readRawSpanTool: rcRawSpanTool, - toolsetTools: rcToolset.toRuntimeTools(rcToolContext), - }), - join(transcriptDir, 'reconcile.jsonl'), - 'reconcile', - { onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) }, - ), + buildToolSet: (_passNumber) => buildReconcileToolSetWithHeartbeat(), getReconciliationActions: () => reconcileActions, - onStepFinish: stage4 - ? ({ passNumber, stepIndex, stepBudget }) => { - emitStageProgress( - 'reconciliation', - 85, - `Reconciling results: pass ${passNumber} step ${stepIndex}/${stepBudget}`, - { transient: true }, - ); - void stage4.updateProgress( - stepIndex / stepBudget, - `Reconciling results · pass ${passNumber} step ${stepIndex}`, - ); - } - : undefined, abortSignal: ctx?.abortSignal, }); curatorReport = curatorOutcome.report; @@ -2091,38 +2109,13 @@ export class IngestBundleRunner { canonicalPins: relevantCanonicalPins, }), buildUserPrompt: (idx, ev) => buildReconcileUserPrompt(idx, ev, undefined, reconcileNotes), - buildToolSet: () => - wrapToolsWithLogger( - buildReconcileToolSet({ - loadSkillTool: rcLoadSkill, - stageListTool: rcStageListTool, - stageDiffTool: rcStageDiffTool, - evictionListTool: rcEvictionListTool, - emitConflictResolutionTool: rcEmitConflictResolutionTool, - emitEvictionDecisionTool: rcEmitEvictionDecisionTool, - emitArtifactResolutionTool: rcEmitArtifactResolutionTool, - emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool, - readRawSpanTool: rcRawSpanTool, - toolsetTools: rcToolset.toRuntimeTools(rcToolContext), - }), - join(transcriptDir, 'reconcile.jsonl'), - 'reconcile', - { onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) }, - ), + buildToolSet: () => buildReconcileToolSetWithHeartbeat(), modelRole: 'reconcile', stepBudget: 60, sourceKey: job.sourceKey, jobId: job.jobId, force: !!overrideReport, abortSignal: ctx?.abortSignal, - onStepFinish: stage4 - ? ({ stepIndex, stepBudget }) => { - emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, { - transient: true, - }); - void stage4.updateProgress(stepIndex / stepBudget, `Reconciling results · step ${stepIndex}`); - } - : undefined, }); } await runTrace.event( diff --git a/packages/cli/src/context/ingest/memory-flow/events.ts b/packages/cli/src/context/ingest/memory-flow/events.ts index 92cebe0f..27ed6ad6 100644 --- a/packages/cli/src/context/ingest/memory-flow/events.ts +++ b/packages/cli/src/context/ingest/memory-flow/events.ts @@ -174,7 +174,7 @@ export function ingestReportToMemoryFlowReplay( const actions = allReportActions(report); const workUnitEvents: MemoryFlowEvent[] = report.body.workUnits.flatMap((workUnit) => [ - { type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [], stepBudget: 0 } satisfies MemoryFlowEvent, + { type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [] } satisfies MemoryFlowEvent, ...workUnit.actions.map( (action): MemoryFlowEvent => ({ type: 'candidate_action', diff --git a/packages/cli/src/context/ingest/memory-flow/schema.ts b/packages/cli/src/context/ingest/memory-flow/schema.ts index f448bbc8..939d5a18 100644 --- a/packages/cli/src/context/ingest/memory-flow/schema.ts +++ b/packages/cli/src/context/ingest/memory-flow/schema.ts @@ -81,13 +81,11 @@ const memoryFlowEventSchema = z.discriminatedUnion('type', [ type: z.literal('work_unit_started'), unitKey: z.string().min(1), skills: z.array(z.string().min(1)), - stepBudget: z.number().int().min(0), }), eventSchema({ type: z.literal('work_unit_step'), unitKey: z.string().min(1), - stepIndex: z.number().int().min(0), - stepBudget: z.number().int().min(0), + toolCalls: z.number().int().min(0), }), eventSchema({ type: z.literal('candidate_action'), diff --git a/packages/cli/src/context/ingest/memory-flow/types.ts b/packages/cli/src/context/ingest/memory-flow/types.ts index 72f1b6de..e620189e 100644 --- a/packages/cli/src/context/ingest/memory-flow/types.ts +++ b/packages/cli/src/context/ingest/memory-flow/types.ts @@ -71,13 +71,11 @@ type MemoryFlowEventPayload = type: 'work_unit_started'; unitKey: string; skills: string[]; - stepBudget: number; } | { type: 'work_unit_step'; unitKey: string; - stepIndex: number; - stepBudget: number; + toolCalls: number; } | { type: 'candidate_action'; diff --git a/packages/cli/src/context/ingest/ports.ts b/packages/cli/src/context/ingest/ports.ts index 7532919e..88294f59 100644 --- a/packages/cli/src/context/ingest/ports.ts +++ b/packages/cli/src/context/ingest/ports.ts @@ -324,7 +324,6 @@ export interface CuratorPaginationPort { }) => string; buildToolSet: (passNumber: number) => KtxRuntimeToolSet; getReconciliationActions: () => MemoryAction[]; - onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void; abortSignal?: AbortSignal; }): Promise; } diff --git a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts index a7387c8a..9e4bbbc6 100644 --- a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts +++ b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts @@ -28,7 +28,6 @@ export interface WorkUnitExecutionDeps { sourceKey: string; connectionId: string; jobId: string; - onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; abortSignal?: AbortSignal; toolFailureCount?: (unitKey: string) => number; } @@ -107,7 +106,6 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit) unitKey: wu.unitKey, jobId: deps.jobId, }, - onStepFinish: deps.onStepFinish, abortSignal: deps.abortSignal, }); } catch (error) { diff --git a/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts b/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts index c78e1b48..d87a8b80 100644 --- a/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts +++ b/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts @@ -15,7 +15,6 @@ export interface ReconciliationContext { sourceKey: string; jobId: string; force?: boolean; - onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; abortSignal?: AbortSignal; forceRun?: boolean; } @@ -40,7 +39,6 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi toolSet: ctx.buildToolSet(), stepBudget: ctx.stepBudget, telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId }, - onStepFinish: ctx.onStepFinish, abortSignal: ctx.abortSignal, }); return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) }; diff --git a/packages/cli/src/context/llm/ai-sdk-runtime.ts b/packages/cli/src/context/llm/ai-sdk-runtime.ts index d5a60c7b..81ada6ea 100644 --- a/packages/cli/src/context/llm/ai-sdk-runtime.ts +++ b/packages/cli/src/context/llm/ai-sdk-runtime.ts @@ -322,21 +322,11 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { messages: promptMessages.messages, tools: built.tools as ToolSet, ...(params.abortSignal ? { abortSignal: params.abortSignal } : {}), - onStepFinish: async () => { + // Count model round-trips locally for metrics. `stepCountIs(stepBudget)` + // caps the loop, so this counter never exceeds the budget. + onStepFinish: () => { stepIndex += 1; stepBoundariesMs.push(Date.now() - startedAt); - if (!params.onStepFinish) { - return; - } - try { - await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget }); - } catch (err) { - this.logger.warn( - `[agent-runner] onStepFinish callback threw; ignoring: ${ - err instanceof Error ? err.message : String(err) - }`, - ); - } }, }; const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request)); diff --git a/packages/cli/src/context/llm/claude-code-runtime.ts b/packages/cli/src/context/llm/claude-code-runtime.ts index 26bd0529..9d0cff70 100644 --- a/packages/cli/src/context/llm/claude-code-runtime.ts +++ b/packages/cli/src/context/llm/claude-code-runtime.ts @@ -6,7 +6,6 @@ import { type SDKResultMessage, } from '@anthropic-ai/claude-agent-sdk'; import { z } from 'zod'; -import { noopLogger, type KtxLogger } from '../../context/core/config.js'; import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js'; import { createKtxClaudeCodeEnv } from './claude-code-env.js'; import { resolveClaudeCodeModel } from './claude-code-models.js'; @@ -53,7 +52,6 @@ export interface ClaudeCodeKtxLlmRuntimeDeps { modelSlots: { default: string } & Partial>; query?: QueryFn; env?: NodeJS.ProcessEnv; - logger?: KtxLogger; rateLimitGovernor?: Pick; } @@ -85,22 +83,6 @@ function isResult(message: SDKMessage): message is SDKResultMessage { return message.type === 'result'; } -// Skip emissions the SDK does not count toward `num_turns`: `pause_turn` continuations and -// errored partials (e.g. `max_output_tokens`) it retries internally. Without this, the -// runtime's step counter outruns `maxTurns` and the HUD renders e.g. `step 69/40`. -function countsAsAssistantTurn(message: SDKMessage): boolean { - if (message.type !== 'assistant' || message.parent_tool_use_id !== null) { - return false; - } - if (message.error !== undefined) { - return false; - } - if (message.message.stop_reason === 'pause_turn') { - return false; - } - return true; -} - function resultError(result: SDKResultMessage): Error | undefined { if (result.subtype === 'success') { return undefined; @@ -296,7 +278,6 @@ async function collectResult(params: { options: Options; allowedToolIds: Set; expectedMcpServerNames: Set; - onAssistantTurn?: () => Promise; rateLimitGovernor?: Pick; abortSignal?: AbortSignal; }): Promise { @@ -321,9 +302,6 @@ async function collectResult(params: { params.rateLimitGovernor?.report(rateLimitSignal); } assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames); - if (countsAsAssistantTurn(message)) { - await params.onAssistantTurn?.(); - } if (isResult(message)) { result = message; } @@ -358,11 +336,9 @@ async function collectResultWithRateLimitRetry(params: Parameters { @@ -434,9 +410,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { } async runAgentLoop(params: RunLoopParams): Promise { - let stepIndex = 0; const startedAt = Date.now(); - const stepBoundariesMs: number[] = []; try { const options = baseOptions({ projectDir: this.deps.projectDir, @@ -453,22 +427,6 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { expectedMcpServerNames: expectedMcpServerNames(params.toolSet), rateLimitGovernor: this.deps.rateLimitGovernor, abortSignal: params.abortSignal, - onAssistantTurn: async () => { - stepIndex += 1; - stepBoundariesMs.push(Date.now() - startedAt); - if (!params.onStepFinish) { - return; - } - try { - await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget }); - } catch (error) { - this.logger.warn( - `[claude-code-runner] onStepFinish callback threw; ignoring: ${ - error instanceof Error ? error.message : String(error) - }`, - ); - } - }, }); const stopReason = mapClaudeCodeStopReason(result); const error = resultError(result); @@ -477,8 +435,12 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { ...(stopReason === 'error' && error ? { error } : {}), metrics: { totalMs: Date.now() - startedAt, - stepCount: stepIndex, - stepBoundariesMs, + // Authoritative turn count from the SDK result. The runtime no longer + // re-derives a per-turn counter: it could not match the SDK's `num_turns` + // and overshot `maxTurns` (the source of the misleading `step 70/40`). + // Per-step boundaries require that counter and are not consumed anywhere. + stepCount: result.num_turns, + stepBoundariesMs: [], usage: claudeTokenUsage(result), }, }; @@ -490,7 +452,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { return { stopReason: 'error', error: err, - metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} }, + metrics: { totalMs: Date.now() - startedAt, stepCount: 0, stepBoundariesMs: [], usage: {} }, }; } } diff --git a/packages/cli/src/context/llm/codex-runtime.ts b/packages/cli/src/context/llm/codex-runtime.ts index 2958b3f8..ce6f609c 100644 --- a/packages/cli/src/context/llm/codex-runtime.ts +++ b/packages/cli/src/context/llm/codex-runtime.ts @@ -1,5 +1,4 @@ import { z } from 'zod'; -import { noopLogger, type KtxLogger } from '../core/config.js'; import { isAbortError, linkAbortSignal } from '../core/abort.js'; import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js'; import { @@ -25,7 +24,6 @@ export interface CodexKtxLlmRuntimeDeps { modelSlots: { default: string } & Partial>; runner?: CodexSdkRunner; startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise; - logger?: KtxLogger; rateLimitGovernor?: Pick; } @@ -40,7 +38,6 @@ function promptWithSystem(system: string | undefined, prompt: string): string { interface CollectCodexEventsOptions { stepBudget?: number; abortController?: AbortController; - onStep?: (stepIndex: number) => void | Promise; } interface CollectCodexEventsResult { @@ -58,8 +55,8 @@ function isTurnCompleted(event: unknown): boolean { } /** - * Drains the Codex stream once, emitting a step as each agent action completes - * so callers see live progress and the step budget is enforced mid-run. Every + * Drains the Codex stream once, counting each completed agent action so the + * step budget is enforced mid-run. Every * completed agent-action item counts (see {@link isCompletedAgentStep}), so * built-in `command_execution` steps decrement the budget the same as * `mcp_tool_call`s. A turn that produced no actions still counts as one step, @@ -93,7 +90,6 @@ async function collectEvents( } completedSteps += 1; - await options.onStep?.(completedSteps); if (isActionStep && options.stepBudget !== undefined && completedSteps >= options.stepBudget) { budgetExceeded = true; options.abortController?.abort(); @@ -170,11 +166,9 @@ function isCodexRateLimitError(error: Error | undefined): boolean { export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { private readonly runner: CodexSdkRunner; - private readonly logger: KtxLogger; constructor(private readonly deps: CodexKtxLlmRuntimeDeps) { this.runner = deps.runner ?? new CodexSdkCliRunner(); - this.logger = deps.logger ?? noopLogger; } private async runWithRateLimitRetry( @@ -328,15 +322,6 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { } : {}), }); - const onStep = async (stepIndex: number): Promise => { - try { - await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget }); - } catch (error) { - this.logger.warn( - `[codex-runner] onStepFinish callback threw; ignoring: ${error instanceof Error ? error.message : String(error)}`, - ); - } - }; const result = await this.runWithRateLimitRetry( params.abortSignal, async () => { @@ -352,7 +337,7 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { env: config.env, signal: abortController.signal, }), - { stepBudget: params.stepBudget, abortController, onStep }, + { stepBudget: params.stepBudget, abortController }, ); const summary = summarizeCodexExecEvents(collected.events, { startedAt }); return { collected, summary }; diff --git a/packages/cli/src/context/llm/runtime-port.ts b/packages/cli/src/context/llm/runtime-port.ts index 9fec6208..c55e3c7a 100644 --- a/packages/cli/src/context/llm/runtime-port.ts +++ b/packages/cli/src/context/llm/runtime-port.ts @@ -17,12 +17,6 @@ export type KtxRuntimeToolSet = Record; export type RunLoopStopReason = 'budget' | 'natural' | 'error'; -/** @internal */ -export interface RunLoopStepInfo { - stepIndex: number; - stepBudget: number; -} - export interface LlmTokenUsage { inputTokens?: number; outputTokens?: number; @@ -48,7 +42,6 @@ export interface RunLoopParams { toolSet: KtxRuntimeToolSet; stepBudget: number; telemetryTags: Record; - onStepFinish?: (info: RunLoopStepInfo) => void | Promise; abortSignal?: AbortSignal; } diff --git a/packages/cli/src/demo-metrics.ts b/packages/cli/src/demo-metrics.ts index d6f9c207..70b0328c 100644 --- a/packages/cli/src/demo-metrics.ts +++ b/packages/cli/src/demo-metrics.ts @@ -15,8 +15,6 @@ interface DemoMetricsTuning { interface DemoMetricsSnapshot { elapsedMs: number; etaMs: number | null; - agentSteps: number; - agentStepBudget: number; toolCalls: number; workUnitsStarted: number; workUnitsFinished: number; @@ -37,18 +35,6 @@ function eventsOf( return events.filter((event): event is Extract => event.type === type); } -function maxAgentStep(events: MemoryFlowEvent[]): { step: number; budget: number } { - const steps = eventsOf(events, 'work_unit_step'); - const started = eventsOf(events, 'work_unit_started'); - const stepIndex = steps.reduce((max, event) => Math.max(max, event.stepIndex), 0); - const stepBudget = Math.max( - 0, - ...steps.map((event) => event.stepBudget), - ...started.map((event) => event.stepBudget), - ); - return { step: stepIndex, budget: stepBudget }; -} - function totalToolCalls(input: MemoryFlowReplayInput): number { return input.details.transcripts.reduce((total, transcript) => total + transcript.toolCallCount, 0); } @@ -96,11 +82,10 @@ export function buildDemoMetrics( const nowMs = (options.now ?? Date.now)(); const elapsedMs = elapsedMsFromEvents(input.events, nowMs); - const { step, budget } = maxAgentStep(input.events); const toolCalls = totalToolCalls(input); const progress = workUnitProgress(input); const finishedCount = eventsOf(input.events, 'work_unit_finished').length; - const stepDriver = Math.max(step, toolCalls, finishedCount * 4); + const stepDriver = Math.max(toolCalls, finishedCount * 4); const inputTokens = stepDriver * inputTokensPerStep; const outputTokens = stepDriver * outputTokensPerStep; @@ -113,8 +98,6 @@ export function buildDemoMetrics( return { elapsedMs, etaMs: estimateEtaMs(elapsedMs, progress.finished, progress.total, input.status), - agentSteps: step, - agentStepBudget: budget, toolCalls, workUnitsStarted: progress.started, workUnitsFinished: progress.finished, diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index 319c3d1b..233b1b6e 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -398,9 +398,8 @@ function plainIngestEventProgress( const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const completed = completedWorkUnitCountThrough(snapshot, eventIndex); const active = activeWorkUnitCountThrough(snapshot, eventIndex); - const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0; - const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55; - const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`; + const percent = total > 0 ? 55 + Math.ceil((completed / total) * 25) : 55; + const latest = `${event.unitKey} · ${pluralize(event.toolCalls, 'action')}`; return { percent, message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`, diff --git a/packages/cli/src/memory-flow-hud.tsx b/packages/cli/src/memory-flow-hud.tsx index 8b044122..5a09bf08 100644 --- a/packages/cli/src/memory-flow-hud.tsx +++ b/packages/cli/src/memory-flow-hud.tsx @@ -139,31 +139,21 @@ function sourceDescription(input: MemoryFlowReplayInput): SourceInfo { return { type: info.type, name: conn, sourceCount: count, itemNounPlural: info.plural, readingVerb: info.verb, ingestDescription: info.description }; } -function activeWorkUnits( - input: MemoryFlowReplayInput, -): Array<{ unitKey: string; stepIndex: number; stepBudget: number }> { +function activeWorkUnits(input: MemoryFlowReplayInput): string[] { const finishedKeys = new Set(); - const unitMap = new Map(); - for (const e of input.events) { - if (e.type === 'work_unit_started') { - unitMap.set(e.unitKey, { stepIndex: 0, stepBudget: e.stepBudget }); - } - if (e.type === 'work_unit_step') { - const existing = unitMap.get(e.unitKey); - if (existing) { - existing.stepIndex = e.stepIndex; - existing.stepBudget = e.stepBudget; - } - } if (e.type === 'work_unit_finished') finishedKeys.add(e.unitKey); } - const result: Array<{ unitKey: string; stepIndex: number; stepBudget: number }> = []; - for (const [unitKey, data] of unitMap) { - if (!finishedKeys.has(unitKey)) result.push({ unitKey, ...data }); + const active: string[] = []; + const seen = new Set(); + for (const e of input.events) { + if (e.type === 'work_unit_started' && !finishedKeys.has(e.unitKey) && !seen.has(e.unitKey)) { + seen.add(e.unitKey); + active.push(e.unitKey); + } } - return result; + return active; } function queuedWorkUnits(input: MemoryFlowReplayInput): string[] { diff --git a/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts b/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts index b491acf2..68814792 100644 --- a/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts +++ b/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts @@ -904,7 +904,11 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { }); deps.agentRunner.runLoop.mockImplementation(async (params: any) => { if (params.telemetryTags.operationName === 'ingest-bundle-wu') { - await params.onStepFinish?.({ stepIndex: 1, stepBudget: params.stepBudget }); + // A real tool call drives the live work_unit_step heartbeat. + await params.toolSet.record_verification_ledger.execute( + { summary: 'Captured order context.', verifiedIdentifiers: [], unverifiedIdentifiers: [] }, + { toolCallId: 'ledger-1', messages: [] }, + ); currentToolSession.actions.push({ target: 'wiki', type: 'created', @@ -948,9 +952,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { type: 'work_unit_started', unitKey: 'u1', skills: ['ingest_triage', 'sl_capture', 'wiki_capture'], - stepBudget: 40, }), - expect.objectContaining({ type: 'work_unit_step', unitKey: 'u1', stepIndex: 1, stepBudget: 40 }), + expect.objectContaining({ type: 'work_unit_step', unitKey: 'u1', toolCalls: 1 }), expect.objectContaining({ type: 'candidate_action', unitKey: 'u1', @@ -2226,22 +2229,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { it('emits a monotonically non-decreasing progress sequence reaching 1.0, covering all 7 stages', async () => { const deps = makeDeps(); - // Simulate an agent that calls onStepFinish a few times so stage 3 and 4 emit per-step progress. - deps.agentRunner.runLoop.mockImplementation(async (params: any) => { - if (params.onStepFinish) { - for (let i = 1; i <= 3; i++) { - await params.onStepFinish({ stepIndex: i, stepBudget: params.stepBudget }); - } - } - return { stopReason: 'natural' }; - }); - // Trigger Stage 4 reconciliation by having at least one action. - deps.agentRunner.runLoop.mockImplementation(async (params: any) => { - if (params.onStepFinish) { - await params.onStepFinish({ stepIndex: 1, stepBudget: params.stepBudget }); - } - return { stopReason: 'natural' }; - }); + deps.agentRunner.runLoop.mockImplementation(async () => ({ stopReason: 'natural' })); const runner = buildRunner(deps); (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ diff --git a/packages/cli/test/context/ingest/memory-flow/acceptance-fixtures.ts b/packages/cli/test/context/ingest/memory-flow/acceptance-fixtures.ts index 1d57aaa8..a3decab3 100644 --- a/packages/cli/test/context/ingest/memory-flow/acceptance-fixtures.ts +++ b/packages/cli/test/context/ingest/memory-flow/acceptance-fixtures.ts @@ -16,11 +16,11 @@ function baseScenario(overrides: Partial = {}): MemoryFlo { type: 'raw_snapshot_written', syncId: 'sync-success', rawFileCount: 4 }, { type: 'diff_computed', added: 2, modified: 1, deleted: 0, unchanged: 1 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/global/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, - { type: 'work_unit_started', unitKey: 'revenue', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'revenue', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'revenue', target: 'wiki', action: 'updated', key: 'wiki/global/revenue.md' }, { type: 'work_unit_finished', unitKey: 'revenue', status: 'success' }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, @@ -111,7 +111,7 @@ export function validationRevertScenario(): MemoryFlowReplayInput { { type: 'raw_snapshot_written', syncId: 'sync-validation', rawFileCount: 1 }, { type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'work_unit_finished', diff --git a/packages/cli/test/context/ingest/memory-flow/schema.test.ts b/packages/cli/test/context/ingest/memory-flow/schema.test.ts index ee8f3bb9..a0ae40a8 100644 --- a/packages/cli/test/context/ingest/memory-flow/schema.test.ts +++ b/packages/cli/test/context/ingest/memory-flow/schema.test.ts @@ -22,8 +22,8 @@ function snapshot(overrides: Partial = {}): MemoryFlowRep { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'stage_progress', stage: 'integration', percent: 80, message: 'Integrating 1/1 patches: orders' }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, - { type: 'work_unit_step', unitKey: 'orders', stepIndex: 1, stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, + { type: 'work_unit_step', unitKey: 'orders', toolCalls: 1 }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, diff --git a/packages/cli/test/context/ingest/memory-flow/view-model.test.ts b/packages/cli/test/context/ingest/memory-flow/view-model.test.ts index 6bd64943..5d3c7778 100644 --- a/packages/cli/test/context/ingest/memory-flow/view-model.test.ts +++ b/packages/cli/test/context/ingest/memory-flow/view-model.test.ts @@ -60,7 +60,7 @@ function replayInput(): MemoryFlowReplayInput { { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 3 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, @@ -159,7 +159,7 @@ describe('buildMemoryFlowViewModel', () => { { type: 'source_acquired', adapter: 'looker', trigger: 'demo_seeded', fileCount: 7 }, { type: 'source_acquired', adapter: 'notion', trigger: 'demo_seeded', fileCount: 8 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'revenue-and-contracts', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'revenue-and-contracts', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'revenue-and-contracts', @@ -376,7 +376,7 @@ describe('buildMemoryFlowViewModel', () => { { type: 'raw_snapshot_written', syncId: 'sync-errors', rawFileCount: 2 }, { type: 'diff_computed', added: 2, modified: 0, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'work_unit_finished', @@ -402,7 +402,7 @@ describe('buildMemoryFlowViewModel', () => { events: [ { type: 'source_acquired', adapter: 'metricflow', trigger: 'manual_resync', fileCount: 1 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'docs', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'docs', skills: ['wiki_capture'] }, { type: 'work_unit_finished', unitKey: 'docs', status: 'failed', reason: 'agent step budget exhausted' }, ], plannedWorkUnits: [{ unitKey: 'docs', rawFiles: ['docs.md'], peerFileCount: 0, dependencyCount: 0 }], diff --git a/packages/cli/test/context/llm/ai-sdk-runtime.test.ts b/packages/cli/test/context/llm/ai-sdk-runtime.test.ts index bab7d1d7..6c0bbe1d 100644 --- a/packages/cli/test/context/llm/ai-sdk-runtime.test.ts +++ b/packages/cli/test/context/llm/ai-sdk-runtime.test.ts @@ -8,7 +8,6 @@ vi.mock('ai', () => ({ import { generateText } from 'ai'; import { AiSdkKtxLlmRuntime } from '../../../src/context/llm/ai-sdk-runtime.js'; -import type { RunLoopStepInfo } from '../../../src/context/llm/runtime-port.js'; describe('AiSdkKtxLlmRuntime.runAgentLoop', () => { let runtime: AiSdkKtxLlmRuntime; @@ -367,40 +366,14 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => { expect(result.metrics?.usage).toEqual({}); }); - it('invokes caller onStepFinish with incrementing stepIndex and total budget', async () => { - const calls: RunLoopStepInfo[] = []; + it('counts model round-trips into metrics.stepCount', async () => { (generateText as any).mockImplementation(async (opts: any) => { for (let i = 0; i < 3; i++) { - await opts.onStepFinish({}); + opts.onStepFinish({}); } return { text: 'ok', toolCalls: [], steps: [] }; }); - await runtime.runAgentLoop({ - modelRole: 'candidateExtraction', - systemPrompt: '', - userPrompt: '', - toolSet: {}, - stepBudget: 10, - telemetryTags: {}, - onStepFinish: (info) => { - calls.push(info); - }, - }); - - expect(calls).toEqual([ - { stepIndex: 1, stepBudget: 10 }, - { stepIndex: 2, stepBudget: 10 }, - { stepIndex: 3, stepBudget: 10 }, - ]); - }); - - it('swallows errors thrown from caller onStepFinish without aborting the loop', async () => { - (generateText as any).mockImplementation(async (opts: any) => { - await opts.onStepFinish({}); - return { text: 'ok', toolCalls: [], steps: [] }; - }); - const result = await runtime.runAgentLoop({ modelRole: 'candidateExtraction', systemPrompt: '', @@ -408,12 +381,10 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => { toolSet: {}, stepBudget: 10, telemetryTags: {}, - onStepFinish: () => { - throw new Error('boom'); - }, }); - expect(result.stopReason).toBe('natural'); + expect(result.metrics?.stepCount).toBe(3); + expect(result.metrics?.stepBoundariesMs).toHaveLength(3); }); it('forwards telemetryTags.source through experimental_telemetry metadata', async () => { diff --git a/packages/cli/test/context/llm/claude-code-runtime.test.ts b/packages/cli/test/context/llm/claude-code-runtime.test.ts index ba83cde6..182fdbc5 100644 --- a/packages/cli/test/context/llm/claude-code-runtime.test.ts +++ b/packages/cli/test/context/llm/claude-code-runtime.test.ts @@ -382,7 +382,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => { query, env: {}, }); - const onStepFinish = vi.fn(); await runtime.runAgentLoop({ modelRole: 'default', @@ -398,7 +397,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => { }, stepBudget: 1, telemetryTags: { operationName: 'test' }, - onStepFinish, }); const options = query.mock.calls[0][0].options; @@ -416,7 +414,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => { behavior: 'deny', toolUseID: '2', }); - expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 1 }); }); it('treats host-discovered commands skills and agents as non-fatal init metadata for text and auth probe', async () => { @@ -664,108 +661,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => { ); }); - it('counts only assistant turns the SDK counts toward num_turns', async () => { - const assistantMessage = ( - overrides: Partial> & { uuid: string }, - ): SDKMessage => - ({ - type: 'assistant', - message: { role: 'assistant', content: [], stop_reason: 'end_turn' }, - parent_tool_use_id: null, - session_id: 'session-id', - ...overrides, - }) as unknown as SDKMessage; - - const query = vi.fn((_input: any) => - stream([ - initMessage(), - assistantMessage({ - uuid: '00000000-0000-4000-8000-0000000000a1', - error: 'max_output_tokens', - }), - assistantMessage({ - uuid: '00000000-0000-4000-8000-0000000000a2', - message: { role: 'assistant', content: [], stop_reason: 'pause_turn' } as never, - }), - assistantMessage({ uuid: '00000000-0000-4000-8000-0000000000a3' }), - { - type: 'assistant', - message: { role: 'assistant', content: [], stop_reason: 'end_turn' }, - parent_tool_use_id: 'tool-use-1', - uuid: '00000000-0000-4000-8000-0000000000a4', - session_id: 'session-id', - } as unknown as SDKMessage, - resultMessage({ subtype: 'success', terminal_reason: 'completed' }), - ]), - ); - const runtime = new ClaudeCodeKtxLlmRuntime({ - projectDir: '/tmp/project', - modelSlots: { default: 'sonnet' }, - query, - env: {}, - }); - const onStepFinish = vi.fn(); - - await expect( - runtime.runAgentLoop({ - modelRole: 'default', - systemPrompt: 'system', - userPrompt: 'user', - toolSet: {}, - stepBudget: 40, - telemetryTags: { operationName: 'test' }, - onStepFinish, - }), - ).resolves.toMatchObject({ stopReason: 'natural' }); - - expect(onStepFinish).toHaveBeenCalledTimes(1); - expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 40 }); - }); - - it('logs and ignores onStepFinish callback errors', async () => { - const query = vi.fn((_input: any) => - stream([ - initMessage(), - { - type: 'assistant', - message: { role: 'assistant', content: [] }, - parent_tool_use_id: null, - uuid: '00000000-0000-4000-8000-000000000005', - session_id: 'session-id', - } as unknown as SDKMessage, - resultMessage({ subtype: 'success', terminal_reason: 'completed' }), - ]), - ); - const logger = { - debug: vi.fn(), - log: vi.fn(), - warn: vi.fn(), - error: vi.fn(), - }; - const runtime = new ClaudeCodeKtxLlmRuntime({ - projectDir: '/tmp/project', - modelSlots: { default: 'sonnet' }, - query, - env: {}, - logger, - }); - - await expect( - runtime.runAgentLoop({ - modelRole: 'default', - systemPrompt: 'system', - userPrompt: 'user', - toolSet: {}, - stepBudget: 1, - telemetryTags: { operationName: 'test' }, - onStepFinish: async () => { - throw new Error('callback exploded'); - }, - }), - ).resolves.toMatchObject({ stopReason: 'natural' }); - expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('callback exploded')); - }); - it('maps max-turn terminal reasons to budget', () => { expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_max_turns' }))).toBe('budget'); expect(mapClaudeCodeStopReason(resultMessage({ terminal_reason: 'max_turns' }))).toBe('budget'); @@ -774,20 +669,14 @@ describe('ClaudeCodeKtxLlmRuntime', () => { expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_during_execution' }))).toBe('error'); }); - it('returns loop metrics including step count and mapped token usage', async () => { + it('reports stepCount from the SDK result num_turns and mapped token usage', async () => { const query = vi.fn((_input: any) => stream([ initMessage(), - { - type: 'assistant', - message: { role: 'assistant', content: [] }, - parent_tool_use_id: null, - uuid: '00000000-0000-4000-8000-000000000006', - session_id: 'session-id', - } as unknown as SDKMessage, resultMessage({ subtype: 'success', terminal_reason: 'completed', + num_turns: 3, usage: { input_tokens: 50, output_tokens: 10 } as never, }), ]), @@ -808,8 +697,9 @@ describe('ClaudeCodeKtxLlmRuntime', () => { telemetryTags: { operationName: 'test' }, }); - expect(result.metrics?.stepCount).toBe(1); - expect(result.metrics?.stepBoundariesMs).toHaveLength(1); + // Authoritative SDK count, not a re-derived per-message tally. + expect(result.metrics?.stepCount).toBe(3); + expect(result.metrics?.stepBoundariesMs).toEqual([]); expect(result.metrics?.usage).toEqual({ inputTokens: 50, outputTokens: 10, totalTokens: 60 }); }); diff --git a/packages/cli/test/context/llm/codex-runtime.test.ts b/packages/cli/test/context/llm/codex-runtime.test.ts index 4c3fcdfd..591011df 100644 --- a/packages/cli/test/context/llm/codex-runtime.test.ts +++ b/packages/cli/test/context/llm/codex-runtime.test.ts @@ -294,7 +294,6 @@ describe('CodexKtxLlmRuntime', () => { runner: fakeRunner, startMcpServer, }); - const onStepFinish = vi.fn(); const result = await runtime.runAgentLoop({ modelRole: 'default', @@ -302,7 +301,6 @@ describe('CodexKtxLlmRuntime', () => { userPrompt: 'user', stepBudget: 5, telemetryTags: {}, - onStepFinish, toolSet: { aliased_wiki_tool: { name: 'wiki_search', @@ -315,7 +313,6 @@ describe('CodexKtxLlmRuntime', () => { expect(result.stopReason).toBe('natural'); expect(result.metrics).toMatchObject({ stepCount: 1, usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 } }); - expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 5 }); expect(startMcpServer).toHaveBeenCalledWith({ projectDir: '/tmp/project', toolSet: expect.any(Object) }); expect(fakeRunner.runStreamed).toHaveBeenCalledWith( expect.objectContaining({ @@ -399,7 +396,6 @@ describe('CodexKtxLlmRuntime', () => { modelSlots: { default: 'codex' }, runner: fakeRunner, }); - const onStepFinish = vi.fn(); const result = await runtime.runAgentLoop({ modelRole: 'default', @@ -407,7 +403,6 @@ describe('CodexKtxLlmRuntime', () => { userPrompt: 'user', stepBudget: 1, telemetryTags: {}, - onStepFinish, toolSet: { first: { name: 'first', @@ -421,8 +416,6 @@ describe('CodexKtxLlmRuntime', () => { expect(result.stopReason).toBe('budget'); expect(result.error).toBeUndefined(); expect(result.metrics).toMatchObject({ stepCount: 1 }); - expect(onStepFinish).toHaveBeenCalledTimes(1); - expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 1 }); expect(fakeRunner.observedSignal()?.aborted).toBe(true); }); @@ -448,7 +441,6 @@ describe('CodexKtxLlmRuntime', () => { modelSlots: { default: 'codex' }, runner: fakeRunner, }); - const onStepFinish = vi.fn(); const result = await runtime.runAgentLoop({ modelRole: 'default', @@ -456,53 +448,15 @@ describe('CodexKtxLlmRuntime', () => { userPrompt: 'user', stepBudget: 2, telemetryTags: {}, - onStepFinish, toolSet: {}, }); expect(result.stopReason).toBe('budget'); expect(result.error).toBeUndefined(); expect(result.metrics).toMatchObject({ stepCount: 2 }); - expect(onStepFinish).toHaveBeenCalledTimes(2); - expect(onStepFinish).toHaveBeenLastCalledWith({ stepIndex: 2, stepBudget: 2 }); expect(fakeRunner.observedSignal()?.aborted).toBe(true); }); - it('fires onStepFinish live as each step completes, before the stream drains', async () => { - const order: string[] = []; - async function* liveEvents() { - yield { type: 'turn.started' }; - yield { type: 'item.completed', item: { type: 'mcp_tool_call', server: 'ktx', tool: 'a', status: 'completed' } }; - order.push('yielded-after-step-1'); - yield { type: 'item.completed', item: { type: 'mcp_tool_call', server: 'ktx', tool: 'b', status: 'completed' } }; - order.push('yielded-after-step-2'); - yield { type: 'item.completed', item: { type: 'agent_message', text: 'done' } }; - yield { type: 'turn.completed', usage: { input_tokens: 1, output_tokens: 1 } }; - } - const fakeRunner = { runStreamed: vi.fn(async () => liveEvents()) }; - const runtime = new CodexKtxLlmRuntime({ - projectDir: '/tmp/project', - modelSlots: { default: 'codex' }, - runner: fakeRunner, - }); - - const result = await runtime.runAgentLoop({ - modelRole: 'default', - systemPrompt: 'system', - userPrompt: 'user', - stepBudget: 10, - telemetryTags: {}, - onStepFinish: ({ stepIndex }) => { - order.push(`step-${stepIndex}`); - }, - toolSet: {}, - }); - - expect(result.stopReason).toBe('natural'); - expect(result.metrics).toMatchObject({ stepCount: 2 }); - expect(order).toEqual(['step-1', 'yielded-after-step-1', 'step-2', 'yielded-after-step-2']); - }); - it('surfaces the real Codex error event even when the SDK stream throws afterward', async () => { // The SDK yields the error/turn.failed events on stdout, then throws on the // non-zero exit. The masked exit message must not hide the real API error. diff --git a/packages/cli/test/demo-metrics.test.ts b/packages/cli/test/demo-metrics.test.ts index fcfe90c8..92857f07 100644 --- a/packages/cli/test/demo-metrics.test.ts +++ b/packages/cli/test/demo-metrics.test.ts @@ -27,13 +27,13 @@ function snapshot(events: MemoryFlowEvent[], overrides: Partial { - it('estimates elapsed, agent steps, tool calls, and cost from event stream', () => { + it('estimates elapsed, tool calls, and cost from event stream', () => { const start = Date.UTC(2026, 0, 1, 0, 0, 0); const input = snapshot( [ { type: 'source_acquired', adapter: 'live-database', trigger: 'demo_full', fileCount: 5, emittedAt: new Date(start).toISOString() }, - { type: 'work_unit_started', unitKey: 'orders', skills: [], stepBudget: 40, emittedAt: new Date(start + 1000).toISOString() }, - { type: 'work_unit_step', unitKey: 'orders', stepIndex: 6, stepBudget: 40, emittedAt: new Date(start + 6000).toISOString() }, + { type: 'work_unit_started', unitKey: 'orders', skills: [], emittedAt: new Date(start + 1000).toISOString() }, + { type: 'work_unit_step', unitKey: 'orders', toolCalls: 6, emittedAt: new Date(start + 6000).toISOString() }, ], { plannedWorkUnits: [ @@ -51,8 +51,6 @@ describe('buildDemoMetrics', () => { const metrics = buildDemoMetrics(input, { now: () => start + 10_000 }); expect(metrics.elapsedMs).toBe(10_000); - expect(metrics.agentSteps).toBe(6); - expect(metrics.agentStepBudget).toBe(40); expect(metrics.toolCalls).toBe(3); expect(metrics.workUnitsTotal).toBe(2); expect(metrics.estimatedTokens).toBeGreaterThan(0); @@ -71,7 +69,7 @@ describe('buildDemoMetrics', () => { const input = snapshot( [ { type: 'source_acquired', adapter: 'a', trigger: 't', fileCount: 1, emittedAt: new Date(start).toISOString() }, - { type: 'work_unit_started', unitKey: 'a', skills: [], stepBudget: 10, emittedAt: new Date(start + 1000).toISOString() }, + { type: 'work_unit_started', unitKey: 'a', skills: [], emittedAt: new Date(start + 1000).toISOString() }, { type: 'work_unit_finished', unitKey: 'a', status: 'success', emittedAt: new Date(start + 5000).toISOString() }, ], { diff --git a/packages/cli/test/ingest.test.ts b/packages/cli/test/ingest.test.ts index 4fc47d0c..21037e91 100644 --- a/packages/cli/test/ingest.test.ts +++ b/packages/cli/test/ingest.test.ts @@ -176,8 +176,8 @@ describe('runKtxIngest', () => { const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise => { input.memoryFlow?.emit({ type: 'source_acquired', adapter: 'fake', trigger: 'manual_resync', fileCount: 2 }); input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }); - input.memoryFlow?.emit({ type: 'work_unit_started', unitKey: 'orders', skills: [], stepBudget: 4 }); - input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'orders', stepIndex: 2, stepBudget: 4 }); + input.memoryFlow?.emit({ type: 'work_unit_started', unitKey: 'orders', skills: [] }); + input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'orders', toolCalls: 2 }); return completedLocalBundleRun(input, 'cli-local-progress-1'); }); const io = makeIo(); @@ -206,7 +206,7 @@ describe('runKtxIngest', () => { { percent: 15, message: 'Fetched 2 source files from fake' }, { percent: 45, message: 'Planned 2 tasks' }, expect.objectContaining({ - message: 'Processing tasks: 0/2 complete, 1 active; latest orders step 2/4', + message: 'Processing tasks: 0/2 complete, 1 active; latest orders · 2 actions', transient: true, }), ]), @@ -776,13 +776,11 @@ describe('runKtxIngest', () => { type: 'work_unit_started', unitKey: 'metabase-col-6', skills: ['sl_capture'], - stepBudget: 40, }); input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'metabase-col-6', - stepIndex: 7, - stepBudget: 40, + toolCalls: 7, }); input.memoryFlow?.emit({ type: 'stage_progress', @@ -806,7 +804,6 @@ describe('runKtxIngest', () => { type: 'work_unit_started', unitKey: 'metabase-col-7', skills: ['sl_capture'], - stepBudget: 40, }); input.progress?.onMetabaseChildCompleted?.({ metabaseConnectionId: 'prod-metabase', @@ -831,8 +828,8 @@ describe('runKtxIngest', () => { { percent: 45, message: 'Planned 1 task' }, { percent: 55, message: 'Processing 1/1 tasks: metabase-col-6' }, { - percent: 60, - message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 step 7/40', + percent: 55, + message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 · 7 actions', transient: true, }, { percent: 81, message: 'Resolving text conflict for metabase-col-6' }, @@ -1733,7 +1730,6 @@ describe('runKtxIngest', () => { type: 'work_unit_started', unitKey: 'historic-sql-table-public-orders', skills: ['historic_sql_table_digest'], - stepBudget: 40, }); input.memoryFlow?.emit({ type: 'work_unit_finished', @@ -1856,13 +1852,11 @@ describe('runKtxIngest', () => { type: 'work_unit_started', unitKey: 'historic-sql-table-public-orders', skills: ['historic_sql_table_digest'], - stepBudget: 40, }); input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'historic-sql-table-public-orders', - stepIndex: 7, - stepBudget: 40, + toolCalls: 7, }); input.memoryFlow?.emit({ type: 'work_unit_finished', @@ -1897,7 +1891,7 @@ describe('runKtxIngest', () => { expect(stderr).toContain('[45%] Planned 2 tasks'); expect(stderr).toContain('[55%] Processing 1/2 tasks: historic-sql-table-public-orders'); expect(stderr).toContain( - '\r[58%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', + '\r[55%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders · 7 actions\u001b[K', ); expect(stderr).toContain('[68%] Processed 1/2 tasks'); }); @@ -1954,11 +1948,10 @@ describe('runKtxIngest', () => { type: 'work_unit_started', unitKey, skills: ['historic_sql_table_digest'], - stepBudget: 40, }); } for (const unitKey of workUnitKeys) { - input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, stepIndex: 1, stepBudget: 40 }); + input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, toolCalls: 1 }); } input.memoryFlow?.finish('done'); return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job'); @@ -1986,10 +1979,10 @@ describe('runKtxIngest', () => { const stderr = io.stderr(); expect(stderr).toContain( - '\r[56%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', + '\r[55%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers · 1 action\u001b[K', ); expect(stderr).not.toContain( - '\n[56%] Processing 6/6 tasks: historic-sql-table-public-suppliers step 1/40\n', + '\n[55%] Processing 6/6 tasks: historic-sql-table-public-suppliers · 1 action\n', ); expect(stderr).toContain('\n[100%] Ingest completed\n'); }); diff --git a/packages/cli/test/memory-flow-interactive.test.ts b/packages/cli/test/memory-flow-interactive.test.ts index befc7f01..998027d6 100644 --- a/packages/cli/test/memory-flow-interactive.test.ts +++ b/packages/cli/test/memory-flow-interactive.test.ts @@ -46,9 +46,9 @@ function replay(): MemoryFlowReplayInput { { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 4 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, - { type: 'work_unit_started', unitKey: 'customers', skills: ['wiki_capture'], stepBudget: 4 }, + { type: 'work_unit_started', unitKey: 'customers', skills: ['wiki_capture'] }, { type: 'work_unit_finished', unitKey: 'customers', status: 'failed', reason: 'validation reset' }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 1 }, { type: 'saved', commitSha: 'abc12345', wikiCount: 1, slCount: 1 }, diff --git a/packages/cli/test/memory-flow-tui.test.tsx b/packages/cli/test/memory-flow-tui.test.tsx index 1bb38b72..87a5a96f 100644 --- a/packages/cli/test/memory-flow-tui.test.tsx +++ b/packages/cli/test/memory-flow-tui.test.tsx @@ -35,10 +35,10 @@ function replayInput(): MemoryFlowReplayInput { { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, - { type: 'work_unit_started', unitKey: 'customers', skills: ['sl_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'customers', skills: ['sl_capture'] }, { type: 'candidate_action', unitKey: 'customers', target: 'sl', action: 'updated', key: 'orbit_demo.customers' }, { type: 'work_unit_finished', unitKey: 'customers', status: 'success' }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, @@ -220,7 +220,7 @@ describe('MemoryFlowTuiApp', () => { { type: 'source_acquired', adapter: 'live-database', trigger: 'manual_resync', fileCount: 1 }, { type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, ], plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }], }; @@ -240,7 +240,7 @@ describe('MemoryFlowTuiApp', () => { { type: 'source_acquired', adapter: 'dbt-descriptions', trigger: 'manual_resync', fileCount: 3 }, { type: 'diff_computed', added: 11, modified: 0, deleted: 0, unchanged: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, - { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, + { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] }, ], plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }], };