fix(ingest): drive work-unit progress from tool calls, not turn counts (#269)

The ingest HUD showed "step 70/40" because the Claude subscription runtime
re-derived a per-turn counter that could not match the SDK's num_turns and
overshot the maxTurns budget. Replace the turn-based work_unit_step heartbeat
with a real, observed tool-call count (no denominator), report
metrics.stepCount from the SDK's authoritative num_turns, and delete the
brittle countsAsAssistantTurn denylist plus the now-unused onStepFinish
callback across the runtime port and all three runtimes. Reconcile and curator
progress move to the same tool-call heartbeat.
This commit is contained in:
Andrey Avtomonov 2026-06-08 15:30:35 +02:00 committed by GitHub
parent 18245c2373
commit 2896f9fb91
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
26 changed files with 138 additions and 463 deletions

View file

@ -39,7 +39,6 @@ export interface CuratorPaginationInput {
buildUserPrompt: (input: CuratorPaginationPromptInput) => string; buildUserPrompt: (input: CuratorPaginationPromptInput) => string;
buildToolSet: (passNumber: number) => KtxRuntimeToolSet; buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[]; getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
} }
@ -245,10 +244,6 @@ export class CuratorPaginationService implements CuratorPaginationPort {
jobId: params.input.jobId, jobId: params.input.jobId,
forceRun: params.forceRun, forceRun: params.forceRun,
abortSignal: params.input.abortSignal, abortSignal: params.input.abortSignal,
onStepFinish: params.input.onStepFinish
? ({ stepIndex, stepBudget }) =>
params.input.onStepFinish?.({ passNumber: params.passNumber, stepIndex, stepBudget })
: undefined,
}); });
} }

View file

@ -939,14 +939,13 @@ export class IngestBundleRunner {
workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' }; workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' };
transcriptDir: string; transcriptDir: string;
transcriptSummaries: Map<string, MutableToolTranscriptSummary>; transcriptSummaries: Map<string, MutableToolTranscriptSummary>;
recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => void; recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => MutableToolTranscriptSummary;
stageIndex: StageIndex; stageIndex: StageIndex;
includeContextEvidenceTools: boolean; includeContextEvidenceTools: boolean;
currentTableExists(tableRef: string): Promise<boolean>; currentTableExists(tableRef: string): Promise<boolean>;
memoryFlow?: MemoryFlowEventSink; memoryFlow?: MemoryFlowEventSink;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
wuSkillNames: string[]; wuSkillNames: string[];
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
}): Promise<WorkUnitOutcome> { }): Promise<WorkUnitOutcome> {
const session: CaptureSession = { const session: CaptureSession = {
userId: 'system', userId: 'system',
@ -1050,7 +1049,6 @@ export class IngestBundleRunner {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: input.wu.unitKey, unitKey: input.wu.unitKey,
skills: input.wuSkillNames, skills: input.wuSkillNames,
stepBudget: input.workUnitSettings.stepBudget,
}); });
return executeWorkUnit( return executeWorkUnit(
{ {
@ -1074,8 +1072,10 @@ export class IngestBundleRunner {
slIndex: input.slIndex, slIndex: input.slIndex,
priorProvenance: input.priorProvenance, priorProvenance: input.priorProvenance,
}), }),
buildToolSet: (wuInner) => buildToolSet: (wuInner) => {
wrapToolsWithLogger( const transcriptPath = join(input.transcriptDir, `${wuInner.unitKey}.jsonl`);
const record = input.recordTranscriptEntry(transcriptPath);
return wrapToolsWithLogger(
buildWuToolSet({ buildWuToolSet({
sourceKey: input.job.sourceKey, sourceKey: input.job.sourceKey,
stagedDir: input.stagedDir, stagedDir: input.stagedDir,
@ -1084,10 +1084,23 @@ export class IngestBundleRunner {
emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool, emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool,
toolsetTools: wuToolset.toRuntimeTools(wuToolContext), toolsetTools: wuToolset.toRuntimeTools(wuToolContext),
}), }),
join(input.transcriptDir, `${wuInner.unitKey}.jsonl`), transcriptPath,
wuInner.unitKey, wuInner.unitKey,
{ onEntry: input.recordTranscriptEntry(join(input.transcriptDir, `${wuInner.unitKey}.jsonl`)) }, {
), // Drive the live HUD heartbeat from real tool calls: each invocation
// ticks the running per-unit count. This is an observed signal, not a
// re-derived turn count, so it can never overshoot a budget.
onEntry: (entry) => {
const summary = record(entry);
input.memoryFlow?.emit({
type: 'work_unit_step',
unitKey: wuInner.unitKey,
toolCalls: summary.toolCallCount,
});
},
},
);
},
captureSession: session, captureSession: session,
sessionActions, sessionActions,
modelRole: 'candidateExtraction', modelRole: 'candidateExtraction',
@ -1096,7 +1109,6 @@ export class IngestBundleRunner {
connectionId: input.job.connectionId, connectionId: input.job.connectionId,
jobId: input.job.jobId, jobId: input.job.jobId,
toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0, toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
onStepFinish: input.onStepFinish,
abortSignal: input.abortSignal, abortSignal: input.abortSignal,
}, },
input.wu, input.wu,
@ -1166,11 +1178,12 @@ export class IngestBundleRunner {
const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId); const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId);
const recordTranscriptEntry = const recordTranscriptEntry =
(path: string) => (path: string) =>
(entry: ToolCallLogEntry): void => { (entry: ToolCallLogEntry): MutableToolTranscriptSummary => {
const current = const current =
transcriptSummaries.get(entry.wuKey) ?? createMutableToolTranscriptSummary(entry.wuKey, path); transcriptSummaries.get(entry.wuKey) ?? createMutableToolTranscriptSummary(entry.wuKey, path);
recordToolTranscriptEntry(current, entry); recordToolTranscriptEntry(current, entry);
transcriptSummaries.set(entry.wuKey, current); transcriptSummaries.set(entry.wuKey, current);
return current;
}; };
const overrideReport = await this.loadOverrideReport(job); const overrideReport = await this.loadOverrideReport(job);
@ -1639,9 +1652,6 @@ export class IngestBundleRunner {
abortSignal: ctx?.abortSignal, abortSignal: ctx?.abortSignal,
memoryFlow, memoryFlow,
wuSkillNames, wuSkillNames,
onStepFinish: ({ stepIndex, stepBudget }) => {
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
},
}); });
}, },
}); });
@ -2013,6 +2023,45 @@ export class IngestBundleRunner {
let curatorWarnings: string[] = []; let curatorWarnings: string[] = [];
let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>; let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>;
// Reconcile shares the work-unit liveness model: the HUD heartbeat is driven
// by real tool calls (a monotonic, observed count), not a re-derived turn
// counter. The soft cap only paces the phase progress bar; it is never shown
// to the user, so it cannot read as a misleading "X/Y" fraction.
const reconcileTranscriptPath = join(transcriptDir, 'reconcile.jsonl');
const reconcileProgressSoftCap = 40;
const buildReconcileToolSetWithHeartbeat = (): KtxRuntimeToolSet => {
const record = recordTranscriptEntry(reconcileTranscriptPath);
return wrapToolsWithLogger(
buildReconcileToolSet({
loadSkillTool: rcLoadSkill,
stageListTool: rcStageListTool,
stageDiffTool: rcStageDiffTool,
evictionListTool: rcEvictionListTool,
emitConflictResolutionTool: rcEmitConflictResolutionTool,
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
readRawSpanTool: rcRawSpanTool,
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
}),
reconcileTranscriptPath,
'reconcile',
{
onEntry: (entry) => {
const summary = record(entry);
if (!stage4) {
return;
}
const label = `Reconciling results · ${summary.toolCallCount} action${
summary.toolCallCount === 1 ? '' : 's'
}`;
emitStageProgress('reconciliation', 85, label, { transient: true });
void stage4.updateProgress(Math.min(0.95, summary.toolCallCount / reconcileProgressSoftCap), label);
},
},
);
};
const reconcileStartedAt = Date.now(); const reconcileStartedAt = Date.now();
const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single'; const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single';
if (contextReport && this.deps.curatorPagination) { if (contextReport && this.deps.curatorPagination) {
@ -2035,39 +2084,8 @@ export class IngestBundleRunner {
}), }),
buildUserPrompt: ({ summary, items, runState }) => buildUserPrompt: ({ summary, items, runState }) =>
buildReconcileUserPrompt(stageIndex, eviction, { summary, items }, reconcileNotes, runState), buildReconcileUserPrompt(stageIndex, eviction, { summary, items }, reconcileNotes, runState),
buildToolSet: (_passNumber) => buildToolSet: (_passNumber) => buildReconcileToolSetWithHeartbeat(),
wrapToolsWithLogger(
buildReconcileToolSet({
loadSkillTool: rcLoadSkill,
stageListTool: rcStageListTool,
stageDiffTool: rcStageDiffTool,
evictionListTool: rcEvictionListTool,
emitConflictResolutionTool: rcEmitConflictResolutionTool,
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
readRawSpanTool: rcRawSpanTool,
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
}),
join(transcriptDir, 'reconcile.jsonl'),
'reconcile',
{ onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) },
),
getReconciliationActions: () => reconcileActions, getReconciliationActions: () => reconcileActions,
onStepFinish: stage4
? ({ passNumber, stepIndex, stepBudget }) => {
emitStageProgress(
'reconciliation',
85,
`Reconciling results: pass ${passNumber} step ${stepIndex}/${stepBudget}`,
{ transient: true },
);
void stage4.updateProgress(
stepIndex / stepBudget,
`Reconciling results · pass ${passNumber} step ${stepIndex}`,
);
}
: undefined,
abortSignal: ctx?.abortSignal, abortSignal: ctx?.abortSignal,
}); });
curatorReport = curatorOutcome.report; curatorReport = curatorOutcome.report;
@ -2091,38 +2109,13 @@ export class IngestBundleRunner {
canonicalPins: relevantCanonicalPins, canonicalPins: relevantCanonicalPins,
}), }),
buildUserPrompt: (idx, ev) => buildReconcileUserPrompt(idx, ev, undefined, reconcileNotes), buildUserPrompt: (idx, ev) => buildReconcileUserPrompt(idx, ev, undefined, reconcileNotes),
buildToolSet: () => buildToolSet: () => buildReconcileToolSetWithHeartbeat(),
wrapToolsWithLogger(
buildReconcileToolSet({
loadSkillTool: rcLoadSkill,
stageListTool: rcStageListTool,
stageDiffTool: rcStageDiffTool,
evictionListTool: rcEvictionListTool,
emitConflictResolutionTool: rcEmitConflictResolutionTool,
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
readRawSpanTool: rcRawSpanTool,
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
}),
join(transcriptDir, 'reconcile.jsonl'),
'reconcile',
{ onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) },
),
modelRole: 'reconcile', modelRole: 'reconcile',
stepBudget: 60, stepBudget: 60,
sourceKey: job.sourceKey, sourceKey: job.sourceKey,
jobId: job.jobId, jobId: job.jobId,
force: !!overrideReport, force: !!overrideReport,
abortSignal: ctx?.abortSignal, abortSignal: ctx?.abortSignal,
onStepFinish: stage4
? ({ stepIndex, stepBudget }) => {
emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, {
transient: true,
});
void stage4.updateProgress(stepIndex / stepBudget, `Reconciling results · step ${stepIndex}`);
}
: undefined,
}); });
} }
await runTrace.event( await runTrace.event(

View file

@ -174,7 +174,7 @@ export function ingestReportToMemoryFlowReplay(
const actions = allReportActions(report); const actions = allReportActions(report);
const workUnitEvents: MemoryFlowEvent[] = report.body.workUnits.flatMap((workUnit) => [ const workUnitEvents: MemoryFlowEvent[] = report.body.workUnits.flatMap((workUnit) => [
{ type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [], stepBudget: 0 } satisfies MemoryFlowEvent, { type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [] } satisfies MemoryFlowEvent,
...workUnit.actions.map( ...workUnit.actions.map(
(action): MemoryFlowEvent => ({ (action): MemoryFlowEvent => ({
type: 'candidate_action', type: 'candidate_action',

View file

@ -81,13 +81,11 @@ const memoryFlowEventSchema = z.discriminatedUnion('type', [
type: z.literal('work_unit_started'), type: z.literal('work_unit_started'),
unitKey: z.string().min(1), unitKey: z.string().min(1),
skills: z.array(z.string().min(1)), skills: z.array(z.string().min(1)),
stepBudget: z.number().int().min(0),
}), }),
eventSchema({ eventSchema({
type: z.literal('work_unit_step'), type: z.literal('work_unit_step'),
unitKey: z.string().min(1), unitKey: z.string().min(1),
stepIndex: z.number().int().min(0), toolCalls: z.number().int().min(0),
stepBudget: z.number().int().min(0),
}), }),
eventSchema({ eventSchema({
type: z.literal('candidate_action'), type: z.literal('candidate_action'),

View file

@ -71,13 +71,11 @@ type MemoryFlowEventPayload =
type: 'work_unit_started'; type: 'work_unit_started';
unitKey: string; unitKey: string;
skills: string[]; skills: string[];
stepBudget: number;
} }
| { | {
type: 'work_unit_step'; type: 'work_unit_step';
unitKey: string; unitKey: string;
stepIndex: number; toolCalls: number;
stepBudget: number;
} }
| { | {
type: 'candidate_action'; type: 'candidate_action';

View file

@ -324,7 +324,6 @@ export interface CuratorPaginationPort {
}) => string; }) => string;
buildToolSet: (passNumber: number) => KtxRuntimeToolSet; buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[]; getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
}): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>; }): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>;
} }

View file

@ -28,7 +28,6 @@ export interface WorkUnitExecutionDeps {
sourceKey: string; sourceKey: string;
connectionId: string; connectionId: string;
jobId: string; jobId: string;
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
toolFailureCount?: (unitKey: string) => number; toolFailureCount?: (unitKey: string) => number;
} }
@ -107,7 +106,6 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
unitKey: wu.unitKey, unitKey: wu.unitKey,
jobId: deps.jobId, jobId: deps.jobId,
}, },
onStepFinish: deps.onStepFinish,
abortSignal: deps.abortSignal, abortSignal: deps.abortSignal,
}); });
} catch (error) { } catch (error) {

View file

@ -15,7 +15,6 @@ export interface ReconciliationContext {
sourceKey: string; sourceKey: string;
jobId: string; jobId: string;
force?: boolean; force?: boolean;
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
forceRun?: boolean; forceRun?: boolean;
} }
@ -40,7 +39,6 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi
toolSet: ctx.buildToolSet(), toolSet: ctx.buildToolSet(),
stepBudget: ctx.stepBudget, stepBudget: ctx.stepBudget,
telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId }, telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId },
onStepFinish: ctx.onStepFinish,
abortSignal: ctx.abortSignal, abortSignal: ctx.abortSignal,
}); });
return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) }; return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) };

View file

@ -322,21 +322,11 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
messages: promptMessages.messages, messages: promptMessages.messages,
tools: built.tools as ToolSet, tools: built.tools as ToolSet,
...(params.abortSignal ? { abortSignal: params.abortSignal } : {}), ...(params.abortSignal ? { abortSignal: params.abortSignal } : {}),
onStepFinish: async () => { // Count model round-trips locally for metrics. `stepCountIs(stepBudget)`
// caps the loop, so this counter never exceeds the budget.
onStepFinish: () => {
stepIndex += 1; stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt); stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) {
return;
}
try {
await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget });
} catch (err) {
this.logger.warn(
`[agent-runner] onStepFinish callback threw; ignoring: ${
err instanceof Error ? err.message : String(err)
}`,
);
}
}, },
}; };
const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request)); const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request));

View file

@ -6,7 +6,6 @@ import {
type SDKResultMessage, type SDKResultMessage,
} from '@anthropic-ai/claude-agent-sdk'; } from '@anthropic-ai/claude-agent-sdk';
import { z } from 'zod'; import { z } from 'zod';
import { noopLogger, type KtxLogger } from '../../context/core/config.js';
import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js'; import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js';
import { createKtxClaudeCodeEnv } from './claude-code-env.js'; import { createKtxClaudeCodeEnv } from './claude-code-env.js';
import { resolveClaudeCodeModel } from './claude-code-models.js'; import { resolveClaudeCodeModel } from './claude-code-models.js';
@ -53,7 +52,6 @@ export interface ClaudeCodeKtxLlmRuntimeDeps {
modelSlots: { default: string } & Partial<Record<string, string>>; modelSlots: { default: string } & Partial<Record<string, string>>;
query?: QueryFn; query?: QueryFn;
env?: NodeJS.ProcessEnv; env?: NodeJS.ProcessEnv;
logger?: KtxLogger;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>; rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
} }
@ -85,22 +83,6 @@ function isResult(message: SDKMessage): message is SDKResultMessage {
return message.type === 'result'; return message.type === 'result';
} }
// Skip emissions the SDK does not count toward `num_turns`: `pause_turn` continuations and
// errored partials (e.g. `max_output_tokens`) it retries internally. Without this, the
// runtime's step counter outruns `maxTurns` and the HUD renders e.g. `step 69/40`.
function countsAsAssistantTurn(message: SDKMessage): boolean {
if (message.type !== 'assistant' || message.parent_tool_use_id !== null) {
return false;
}
if (message.error !== undefined) {
return false;
}
if (message.message.stop_reason === 'pause_turn') {
return false;
}
return true;
}
function resultError(result: SDKResultMessage): Error | undefined { function resultError(result: SDKResultMessage): Error | undefined {
if (result.subtype === 'success') { if (result.subtype === 'success') {
return undefined; return undefined;
@ -296,7 +278,6 @@ async function collectResult(params: {
options: Options; options: Options;
allowedToolIds: Set<string>; allowedToolIds: Set<string>;
expectedMcpServerNames: Set<string>; expectedMcpServerNames: Set<string>;
onAssistantTurn?: () => Promise<void>;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>; rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
}): Promise<ClaudeQueryOutcome> { }): Promise<ClaudeQueryOutcome> {
@ -321,9 +302,6 @@ async function collectResult(params: {
params.rateLimitGovernor?.report(rateLimitSignal); params.rateLimitGovernor?.report(rateLimitSignal);
} }
assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames); assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames);
if (countsAsAssistantTurn(message)) {
await params.onAssistantTurn?.();
}
if (isResult(message)) { if (isResult(message)) {
result = message; result = message;
} }
@ -358,11 +336,9 @@ async function collectResultWithRateLimitRetry(params: Parameters<typeof collect
export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
private readonly runQuery: QueryFn; private readonly runQuery: QueryFn;
private readonly logger: KtxLogger;
constructor(private readonly deps: ClaudeCodeKtxLlmRuntimeDeps) { constructor(private readonly deps: ClaudeCodeKtxLlmRuntimeDeps) {
this.runQuery = deps.query ?? defaultQuery; this.runQuery = deps.query ?? defaultQuery;
this.logger = deps.logger ?? noopLogger;
} }
async generateText(input: KtxGenerateTextInput): Promise<string> { async generateText(input: KtxGenerateTextInput): Promise<string> {
@ -434,9 +410,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
} }
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> { async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
let stepIndex = 0;
const startedAt = Date.now(); const startedAt = Date.now();
const stepBoundariesMs: number[] = [];
try { try {
const options = baseOptions({ const options = baseOptions({
projectDir: this.deps.projectDir, projectDir: this.deps.projectDir,
@ -453,22 +427,6 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
expectedMcpServerNames: expectedMcpServerNames(params.toolSet), expectedMcpServerNames: expectedMcpServerNames(params.toolSet),
rateLimitGovernor: this.deps.rateLimitGovernor, rateLimitGovernor: this.deps.rateLimitGovernor,
abortSignal: params.abortSignal, abortSignal: params.abortSignal,
onAssistantTurn: async () => {
stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
if (!params.onStepFinish) {
return;
}
try {
await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget });
} catch (error) {
this.logger.warn(
`[claude-code-runner] onStepFinish callback threw; ignoring: ${
error instanceof Error ? error.message : String(error)
}`,
);
}
},
}); });
const stopReason = mapClaudeCodeStopReason(result); const stopReason = mapClaudeCodeStopReason(result);
const error = resultError(result); const error = resultError(result);
@ -477,8 +435,12 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
...(stopReason === 'error' && error ? { error } : {}), ...(stopReason === 'error' && error ? { error } : {}),
metrics: { metrics: {
totalMs: Date.now() - startedAt, totalMs: Date.now() - startedAt,
stepCount: stepIndex, // Authoritative turn count from the SDK result. The runtime no longer
stepBoundariesMs, // re-derives a per-turn counter: it could not match the SDK's `num_turns`
// and overshot `maxTurns` (the source of the misleading `step 70/40`).
// Per-step boundaries require that counter and are not consumed anywhere.
stepCount: result.num_turns,
stepBoundariesMs: [],
usage: claudeTokenUsage(result), usage: claudeTokenUsage(result),
}, },
}; };
@ -490,7 +452,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
return { return {
stopReason: 'error', stopReason: 'error',
error: err, error: err,
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} }, metrics: { totalMs: Date.now() - startedAt, stepCount: 0, stepBoundariesMs: [], usage: {} },
}; };
} }
} }

View file

@ -1,5 +1,4 @@
import { z } from 'zod'; import { z } from 'zod';
import { noopLogger, type KtxLogger } from '../core/config.js';
import { isAbortError, linkAbortSignal } from '../core/abort.js'; import { isAbortError, linkAbortSignal } from '../core/abort.js';
import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js'; import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js';
import { import {
@ -25,7 +24,6 @@ export interface CodexKtxLlmRuntimeDeps {
modelSlots: { default: string } & Partial<Record<string, string>>; modelSlots: { default: string } & Partial<Record<string, string>>;
runner?: CodexSdkRunner; runner?: CodexSdkRunner;
startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise<CodexRuntimeMcpServerHandle>; startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise<CodexRuntimeMcpServerHandle>;
logger?: KtxLogger;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>; rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
} }
@ -40,7 +38,6 @@ function promptWithSystem(system: string | undefined, prompt: string): string {
interface CollectCodexEventsOptions { interface CollectCodexEventsOptions {
stepBudget?: number; stepBudget?: number;
abortController?: AbortController; abortController?: AbortController;
onStep?: (stepIndex: number) => void | Promise<void>;
} }
interface CollectCodexEventsResult { interface CollectCodexEventsResult {
@ -58,8 +55,8 @@ function isTurnCompleted(event: unknown): boolean {
} }
/** /**
* Drains the Codex stream once, emitting a step as each agent action completes * Drains the Codex stream once, counting each completed agent action so the
* so callers see live progress and the step budget is enforced mid-run. Every * step budget is enforced mid-run. Every
* completed agent-action item counts (see {@link isCompletedAgentStep}), so * completed agent-action item counts (see {@link isCompletedAgentStep}), so
* built-in `command_execution` steps decrement the budget the same as * built-in `command_execution` steps decrement the budget the same as
* `mcp_tool_call`s. A turn that produced no actions still counts as one step, * `mcp_tool_call`s. A turn that produced no actions still counts as one step,
@ -93,7 +90,6 @@ async function collectEvents(
} }
completedSteps += 1; completedSteps += 1;
await options.onStep?.(completedSteps);
if (isActionStep && options.stepBudget !== undefined && completedSteps >= options.stepBudget) { if (isActionStep && options.stepBudget !== undefined && completedSteps >= options.stepBudget) {
budgetExceeded = true; budgetExceeded = true;
options.abortController?.abort(); options.abortController?.abort();
@ -170,11 +166,9 @@ function isCodexRateLimitError(error: Error | undefined): boolean {
export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
private readonly runner: CodexSdkRunner; private readonly runner: CodexSdkRunner;
private readonly logger: KtxLogger;
constructor(private readonly deps: CodexKtxLlmRuntimeDeps) { constructor(private readonly deps: CodexKtxLlmRuntimeDeps) {
this.runner = deps.runner ?? new CodexSdkCliRunner(); this.runner = deps.runner ?? new CodexSdkCliRunner();
this.logger = deps.logger ?? noopLogger;
} }
private async runWithRateLimitRetry<T>( private async runWithRateLimitRetry<T>(
@ -328,15 +322,6 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
} }
: {}), : {}),
}); });
const onStep = async (stepIndex: number): Promise<void> => {
try {
await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget });
} catch (error) {
this.logger.warn(
`[codex-runner] onStepFinish callback threw; ignoring: ${error instanceof Error ? error.message : String(error)}`,
);
}
};
const result = await this.runWithRateLimitRetry( const result = await this.runWithRateLimitRetry(
params.abortSignal, params.abortSignal,
async () => { async () => {
@ -352,7 +337,7 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
env: config.env, env: config.env,
signal: abortController.signal, signal: abortController.signal,
}), }),
{ stepBudget: params.stepBudget, abortController, onStep }, { stepBudget: params.stepBudget, abortController },
); );
const summary = summarizeCodexExecEvents(collected.events, { startedAt }); const summary = summarizeCodexExecEvents(collected.events, { startedAt });
return { collected, summary }; return { collected, summary };

View file

@ -17,12 +17,6 @@ export type KtxRuntimeToolSet = Record<string, KtxRuntimeToolDescriptor>;
export type RunLoopStopReason = 'budget' | 'natural' | 'error'; export type RunLoopStopReason = 'budget' | 'natural' | 'error';
/** @internal */
export interface RunLoopStepInfo {
stepIndex: number;
stepBudget: number;
}
export interface LlmTokenUsage { export interface LlmTokenUsage {
inputTokens?: number; inputTokens?: number;
outputTokens?: number; outputTokens?: number;
@ -48,7 +42,6 @@ export interface RunLoopParams {
toolSet: KtxRuntimeToolSet; toolSet: KtxRuntimeToolSet;
stepBudget: number; stepBudget: number;
telemetryTags: Record<string, string>; telemetryTags: Record<string, string>;
onStepFinish?: (info: RunLoopStepInfo) => void | Promise<void>;
abortSignal?: AbortSignal; abortSignal?: AbortSignal;
} }

View file

@ -15,8 +15,6 @@ interface DemoMetricsTuning {
interface DemoMetricsSnapshot { interface DemoMetricsSnapshot {
elapsedMs: number; elapsedMs: number;
etaMs: number | null; etaMs: number | null;
agentSteps: number;
agentStepBudget: number;
toolCalls: number; toolCalls: number;
workUnitsStarted: number; workUnitsStarted: number;
workUnitsFinished: number; workUnitsFinished: number;
@ -37,18 +35,6 @@ function eventsOf<T extends MemoryFlowEvent['type']>(
return events.filter((event): event is Extract<MemoryFlowEvent, { type: T }> => event.type === type); return events.filter((event): event is Extract<MemoryFlowEvent, { type: T }> => event.type === type);
} }
function maxAgentStep(events: MemoryFlowEvent[]): { step: number; budget: number } {
const steps = eventsOf(events, 'work_unit_step');
const started = eventsOf(events, 'work_unit_started');
const stepIndex = steps.reduce((max, event) => Math.max(max, event.stepIndex), 0);
const stepBudget = Math.max(
0,
...steps.map((event) => event.stepBudget),
...started.map((event) => event.stepBudget),
);
return { step: stepIndex, budget: stepBudget };
}
function totalToolCalls(input: MemoryFlowReplayInput): number { function totalToolCalls(input: MemoryFlowReplayInput): number {
return input.details.transcripts.reduce((total, transcript) => total + transcript.toolCallCount, 0); return input.details.transcripts.reduce((total, transcript) => total + transcript.toolCallCount, 0);
} }
@ -96,11 +82,10 @@ export function buildDemoMetrics(
const nowMs = (options.now ?? Date.now)(); const nowMs = (options.now ?? Date.now)();
const elapsedMs = elapsedMsFromEvents(input.events, nowMs); const elapsedMs = elapsedMsFromEvents(input.events, nowMs);
const { step, budget } = maxAgentStep(input.events);
const toolCalls = totalToolCalls(input); const toolCalls = totalToolCalls(input);
const progress = workUnitProgress(input); const progress = workUnitProgress(input);
const finishedCount = eventsOf(input.events, 'work_unit_finished').length; const finishedCount = eventsOf(input.events, 'work_unit_finished').length;
const stepDriver = Math.max(step, toolCalls, finishedCount * 4); const stepDriver = Math.max(toolCalls, finishedCount * 4);
const inputTokens = stepDriver * inputTokensPerStep; const inputTokens = stepDriver * inputTokensPerStep;
const outputTokens = stepDriver * outputTokensPerStep; const outputTokens = stepDriver * outputTokensPerStep;
@ -113,8 +98,6 @@ export function buildDemoMetrics(
return { return {
elapsedMs, elapsedMs,
etaMs: estimateEtaMs(elapsedMs, progress.finished, progress.total, input.status), etaMs: estimateEtaMs(elapsedMs, progress.finished, progress.total, input.status),
agentSteps: step,
agentStepBudget: budget,
toolCalls, toolCalls,
workUnitsStarted: progress.started, workUnitsStarted: progress.started,
workUnitsFinished: progress.finished, workUnitsFinished: progress.finished,

View file

@ -398,9 +398,8 @@ function plainIngestEventProgress(
const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const completed = completedWorkUnitCountThrough(snapshot, eventIndex); const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
const active = activeWorkUnitCountThrough(snapshot, eventIndex); const active = activeWorkUnitCountThrough(snapshot, eventIndex);
const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0; const percent = total > 0 ? 55 + Math.ceil((completed / total) * 25) : 55;
const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55; const latest = `${event.unitKey} · ${pluralize(event.toolCalls, 'action')}`;
const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`;
return { return {
percent, percent,
message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`, message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`,

View file

@ -139,31 +139,21 @@ function sourceDescription(input: MemoryFlowReplayInput): SourceInfo {
return { type: info.type, name: conn, sourceCount: count, itemNounPlural: info.plural, readingVerb: info.verb, ingestDescription: info.description }; return { type: info.type, name: conn, sourceCount: count, itemNounPlural: info.plural, readingVerb: info.verb, ingestDescription: info.description };
} }
function activeWorkUnits( function activeWorkUnits(input: MemoryFlowReplayInput): string[] {
input: MemoryFlowReplayInput,
): Array<{ unitKey: string; stepIndex: number; stepBudget: number }> {
const finishedKeys = new Set<string>(); const finishedKeys = new Set<string>();
const unitMap = new Map<string, { stepIndex: number; stepBudget: number }>();
for (const e of input.events) { for (const e of input.events) {
if (e.type === 'work_unit_started') {
unitMap.set(e.unitKey, { stepIndex: 0, stepBudget: e.stepBudget });
}
if (e.type === 'work_unit_step') {
const existing = unitMap.get(e.unitKey);
if (existing) {
existing.stepIndex = e.stepIndex;
existing.stepBudget = e.stepBudget;
}
}
if (e.type === 'work_unit_finished') finishedKeys.add(e.unitKey); if (e.type === 'work_unit_finished') finishedKeys.add(e.unitKey);
} }
const result: Array<{ unitKey: string; stepIndex: number; stepBudget: number }> = []; const active: string[] = [];
for (const [unitKey, data] of unitMap) { const seen = new Set<string>();
if (!finishedKeys.has(unitKey)) result.push({ unitKey, ...data }); for (const e of input.events) {
if (e.type === 'work_unit_started' && !finishedKeys.has(e.unitKey) && !seen.has(e.unitKey)) {
seen.add(e.unitKey);
active.push(e.unitKey);
}
} }
return result; return active;
} }
function queuedWorkUnits(input: MemoryFlowReplayInput): string[] { function queuedWorkUnits(input: MemoryFlowReplayInput): string[] {

View file

@ -904,7 +904,11 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
}); });
deps.agentRunner.runLoop.mockImplementation(async (params: any) => { deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.telemetryTags.operationName === 'ingest-bundle-wu') { if (params.telemetryTags.operationName === 'ingest-bundle-wu') {
await params.onStepFinish?.({ stepIndex: 1, stepBudget: params.stepBudget }); // A real tool call drives the live work_unit_step heartbeat.
await params.toolSet.record_verification_ledger.execute(
{ summary: 'Captured order context.', verifiedIdentifiers: [], unverifiedIdentifiers: [] },
{ toolCallId: 'ledger-1', messages: [] },
);
currentToolSession.actions.push({ currentToolSession.actions.push({
target: 'wiki', target: 'wiki',
type: 'created', type: 'created',
@ -948,9 +952,8 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: 'u1', unitKey: 'u1',
skills: ['ingest_triage', 'sl_capture', 'wiki_capture'], skills: ['ingest_triage', 'sl_capture', 'wiki_capture'],
stepBudget: 40,
}), }),
expect.objectContaining({ type: 'work_unit_step', unitKey: 'u1', stepIndex: 1, stepBudget: 40 }), expect.objectContaining({ type: 'work_unit_step', unitKey: 'u1', toolCalls: 1 }),
expect.objectContaining({ expect.objectContaining({
type: 'candidate_action', type: 'candidate_action',
unitKey: 'u1', unitKey: 'u1',
@ -2226,22 +2229,7 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
it('emits a monotonically non-decreasing progress sequence reaching 1.0, covering all 7 stages', async () => { it('emits a monotonically non-decreasing progress sequence reaching 1.0, covering all 7 stages', async () => {
const deps = makeDeps(); const deps = makeDeps();
// Simulate an agent that calls onStepFinish a few times so stage 3 and 4 emit per-step progress. deps.agentRunner.runLoop.mockImplementation(async () => ({ stopReason: 'natural' }));
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.onStepFinish) {
for (let i = 1; i <= 3; i++) {
await params.onStepFinish({ stepIndex: i, stepBudget: params.stepBudget });
}
}
return { stopReason: 'natural' };
});
// Trigger Stage 4 reconciliation by having at least one action.
deps.agentRunner.runLoop.mockImplementation(async (params: any) => {
if (params.onStepFinish) {
await params.onStepFinish({ stepIndex: 1, stepBudget: params.stepBudget });
}
return { stopReason: 'natural' };
});
const runner = buildRunner(deps); const runner = buildRunner(deps);
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({

View file

@ -16,11 +16,11 @@ function baseScenario(overrides: Partial<MemoryFlowReplayInput> = {}): MemoryFlo
{ type: 'raw_snapshot_written', syncId: 'sync-success', rawFileCount: 4 }, { type: 'raw_snapshot_written', syncId: 'sync-success', rawFileCount: 4 },
{ type: 'diff_computed', added: 2, modified: 1, deleted: 0, unchanged: 1 }, { type: 'diff_computed', added: 2, modified: 1, deleted: 0, unchanged: 1 },
{ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/global/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/global/orders.md' },
{ type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' },
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
{ type: 'work_unit_started', unitKey: 'revenue', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'revenue', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'revenue', target: 'wiki', action: 'updated', key: 'wiki/global/revenue.md' }, { type: 'candidate_action', unitKey: 'revenue', target: 'wiki', action: 'updated', key: 'wiki/global/revenue.md' },
{ type: 'work_unit_finished', unitKey: 'revenue', status: 'success' }, { type: 'work_unit_finished', unitKey: 'revenue', status: 'success' },
{ type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 },
@ -111,7 +111,7 @@ export function validationRevertScenario(): MemoryFlowReplayInput {
{ type: 'raw_snapshot_written', syncId: 'sync-validation', rawFileCount: 1 }, { type: 'raw_snapshot_written', syncId: 'sync-validation', rawFileCount: 1 },
{ type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' },
{ {
type: 'work_unit_finished', type: 'work_unit_finished',

View file

@ -22,8 +22,8 @@ function snapshot(overrides: Partial<MemoryFlowReplayInput> = {}): MemoryFlowRep
{ type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'stage_progress', stage: 'integration', percent: 80, message: 'Integrating 1/1 patches: orders' }, { type: 'stage_progress', stage: 'integration', percent: 80, message: 'Integrating 1/1 patches: orders' },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'work_unit_step', unitKey: 'orders', stepIndex: 1, stepBudget: 40 }, { type: 'work_unit_step', unitKey: 'orders', toolCalls: 1 },
{ type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' },
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
{ type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 },

View file

@ -60,7 +60,7 @@ function replayInput(): MemoryFlowReplayInput {
{ type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 },
{ type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 3 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 3 },
{ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' },
{ type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' },
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
@ -159,7 +159,7 @@ describe('buildMemoryFlowViewModel', () => {
{ type: 'source_acquired', adapter: 'looker', trigger: 'demo_seeded', fileCount: 7 }, { type: 'source_acquired', adapter: 'looker', trigger: 'demo_seeded', fileCount: 7 },
{ type: 'source_acquired', adapter: 'notion', trigger: 'demo_seeded', fileCount: 8 }, { type: 'source_acquired', adapter: 'notion', trigger: 'demo_seeded', fileCount: 8 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'revenue-and-contracts', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'revenue-and-contracts', skills: ['wiki_capture'] },
{ {
type: 'candidate_action', type: 'candidate_action',
unitKey: 'revenue-and-contracts', unitKey: 'revenue-and-contracts',
@ -376,7 +376,7 @@ describe('buildMemoryFlowViewModel', () => {
{ type: 'raw_snapshot_written', syncId: 'sync-errors', rawFileCount: 2 }, { type: 'raw_snapshot_written', syncId: 'sync-errors', rawFileCount: 2 },
{ type: 'diff_computed', added: 2, modified: 0, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 2, modified: 0, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' }, { type: 'candidate_action', unitKey: 'orders', target: 'sl', action: 'updated', key: 'warehouse.orders' },
{ {
type: 'work_unit_finished', type: 'work_unit_finished',
@ -402,7 +402,7 @@ describe('buildMemoryFlowViewModel', () => {
events: [ events: [
{ type: 'source_acquired', adapter: 'metricflow', trigger: 'manual_resync', fileCount: 1 }, { type: 'source_acquired', adapter: 'metricflow', trigger: 'manual_resync', fileCount: 1 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'docs', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'docs', skills: ['wiki_capture'] },
{ type: 'work_unit_finished', unitKey: 'docs', status: 'failed', reason: 'agent step budget exhausted' }, { type: 'work_unit_finished', unitKey: 'docs', status: 'failed', reason: 'agent step budget exhausted' },
], ],
plannedWorkUnits: [{ unitKey: 'docs', rawFiles: ['docs.md'], peerFileCount: 0, dependencyCount: 0 }], plannedWorkUnits: [{ unitKey: 'docs', rawFiles: ['docs.md'], peerFileCount: 0, dependencyCount: 0 }],

View file

@ -8,7 +8,6 @@ vi.mock('ai', () => ({
import { generateText } from 'ai'; import { generateText } from 'ai';
import { AiSdkKtxLlmRuntime } from '../../../src/context/llm/ai-sdk-runtime.js'; import { AiSdkKtxLlmRuntime } from '../../../src/context/llm/ai-sdk-runtime.js';
import type { RunLoopStepInfo } from '../../../src/context/llm/runtime-port.js';
describe('AiSdkKtxLlmRuntime.runAgentLoop', () => { describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
let runtime: AiSdkKtxLlmRuntime; let runtime: AiSdkKtxLlmRuntime;
@ -367,40 +366,14 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
expect(result.metrics?.usage).toEqual({}); expect(result.metrics?.usage).toEqual({});
}); });
it('invokes caller onStepFinish with incrementing stepIndex and total budget', async () => { it('counts model round-trips into metrics.stepCount', async () => {
const calls: RunLoopStepInfo[] = [];
(generateText as any).mockImplementation(async (opts: any) => { (generateText as any).mockImplementation(async (opts: any) => {
for (let i = 0; i < 3; i++) { for (let i = 0; i < 3; i++) {
await opts.onStepFinish({}); opts.onStepFinish({});
} }
return { text: 'ok', toolCalls: [], steps: [] }; return { text: 'ok', toolCalls: [], steps: [] };
}); });
await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
onStepFinish: (info) => {
calls.push(info);
},
});
expect(calls).toEqual([
{ stepIndex: 1, stepBudget: 10 },
{ stepIndex: 2, stepBudget: 10 },
{ stepIndex: 3, stepBudget: 10 },
]);
});
it('swallows errors thrown from caller onStepFinish without aborting the loop', async () => {
(generateText as any).mockImplementation(async (opts: any) => {
await opts.onStepFinish({});
return { text: 'ok', toolCalls: [], steps: [] };
});
const result = await runtime.runAgentLoop({ const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction', modelRole: 'candidateExtraction',
systemPrompt: '', systemPrompt: '',
@ -408,12 +381,10 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
toolSet: {}, toolSet: {},
stepBudget: 10, stepBudget: 10,
telemetryTags: {}, telemetryTags: {},
onStepFinish: () => {
throw new Error('boom');
},
}); });
expect(result.stopReason).toBe('natural'); expect(result.metrics?.stepCount).toBe(3);
expect(result.metrics?.stepBoundariesMs).toHaveLength(3);
}); });
it('forwards telemetryTags.source through experimental_telemetry metadata', async () => { it('forwards telemetryTags.source through experimental_telemetry metadata', async () => {

View file

@ -382,7 +382,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
query, query,
env: {}, env: {},
}); });
const onStepFinish = vi.fn();
await runtime.runAgentLoop({ await runtime.runAgentLoop({
modelRole: 'default', modelRole: 'default',
@ -398,7 +397,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
}, },
stepBudget: 1, stepBudget: 1,
telemetryTags: { operationName: 'test' }, telemetryTags: { operationName: 'test' },
onStepFinish,
}); });
const options = query.mock.calls[0][0].options; const options = query.mock.calls[0][0].options;
@ -416,7 +414,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
behavior: 'deny', behavior: 'deny',
toolUseID: '2', toolUseID: '2',
}); });
expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 1 });
}); });
it('treats host-discovered commands skills and agents as non-fatal init metadata for text and auth probe', async () => { it('treats host-discovered commands skills and agents as non-fatal init metadata for text and auth probe', async () => {
@ -664,108 +661,6 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
); );
}); });
it('counts only assistant turns the SDK counts toward num_turns', async () => {
const assistantMessage = (
overrides: Partial<Extract<SDKMessage, { type: 'assistant' }>> & { uuid: string },
): SDKMessage =>
({
type: 'assistant',
message: { role: 'assistant', content: [], stop_reason: 'end_turn' },
parent_tool_use_id: null,
session_id: 'session-id',
...overrides,
}) as unknown as SDKMessage;
const query = vi.fn((_input: any) =>
stream([
initMessage(),
assistantMessage({
uuid: '00000000-0000-4000-8000-0000000000a1',
error: 'max_output_tokens',
}),
assistantMessage({
uuid: '00000000-0000-4000-8000-0000000000a2',
message: { role: 'assistant', content: [], stop_reason: 'pause_turn' } as never,
}),
assistantMessage({ uuid: '00000000-0000-4000-8000-0000000000a3' }),
{
type: 'assistant',
message: { role: 'assistant', content: [], stop_reason: 'end_turn' },
parent_tool_use_id: 'tool-use-1',
uuid: '00000000-0000-4000-8000-0000000000a4',
session_id: 'session-id',
} as unknown as SDKMessage,
resultMessage({ subtype: 'success', terminal_reason: 'completed' }),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
});
const onStepFinish = vi.fn();
await expect(
runtime.runAgentLoop({
modelRole: 'default',
systemPrompt: 'system',
userPrompt: 'user',
toolSet: {},
stepBudget: 40,
telemetryTags: { operationName: 'test' },
onStepFinish,
}),
).resolves.toMatchObject({ stopReason: 'natural' });
expect(onStepFinish).toHaveBeenCalledTimes(1);
expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 40 });
});
it('logs and ignores onStepFinish callback errors', async () => {
const query = vi.fn((_input: any) =>
stream([
initMessage(),
{
type: 'assistant',
message: { role: 'assistant', content: [] },
parent_tool_use_id: null,
uuid: '00000000-0000-4000-8000-000000000005',
session_id: 'session-id',
} as unknown as SDKMessage,
resultMessage({ subtype: 'success', terminal_reason: 'completed' }),
]),
);
const logger = {
debug: vi.fn(),
log: vi.fn(),
warn: vi.fn(),
error: vi.fn(),
};
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
logger,
});
await expect(
runtime.runAgentLoop({
modelRole: 'default',
systemPrompt: 'system',
userPrompt: 'user',
toolSet: {},
stepBudget: 1,
telemetryTags: { operationName: 'test' },
onStepFinish: async () => {
throw new Error('callback exploded');
},
}),
).resolves.toMatchObject({ stopReason: 'natural' });
expect(logger.warn).toHaveBeenCalledWith(expect.stringContaining('callback exploded'));
});
it('maps max-turn terminal reasons to budget', () => { it('maps max-turn terminal reasons to budget', () => {
expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_max_turns' }))).toBe('budget'); expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_max_turns' }))).toBe('budget');
expect(mapClaudeCodeStopReason(resultMessage({ terminal_reason: 'max_turns' }))).toBe('budget'); expect(mapClaudeCodeStopReason(resultMessage({ terminal_reason: 'max_turns' }))).toBe('budget');
@ -774,20 +669,14 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_during_execution' }))).toBe('error'); expect(mapClaudeCodeStopReason(resultMessage({ subtype: 'error_during_execution' }))).toBe('error');
}); });
it('returns loop metrics including step count and mapped token usage', async () => { it('reports stepCount from the SDK result num_turns and mapped token usage', async () => {
const query = vi.fn((_input: any) => const query = vi.fn((_input: any) =>
stream([ stream([
initMessage(), initMessage(),
{
type: 'assistant',
message: { role: 'assistant', content: [] },
parent_tool_use_id: null,
uuid: '00000000-0000-4000-8000-000000000006',
session_id: 'session-id',
} as unknown as SDKMessage,
resultMessage({ resultMessage({
subtype: 'success', subtype: 'success',
terminal_reason: 'completed', terminal_reason: 'completed',
num_turns: 3,
usage: { input_tokens: 50, output_tokens: 10 } as never, usage: { input_tokens: 50, output_tokens: 10 } as never,
}), }),
]), ]),
@ -808,8 +697,9 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
telemetryTags: { operationName: 'test' }, telemetryTags: { operationName: 'test' },
}); });
expect(result.metrics?.stepCount).toBe(1); // Authoritative SDK count, not a re-derived per-message tally.
expect(result.metrics?.stepBoundariesMs).toHaveLength(1); expect(result.metrics?.stepCount).toBe(3);
expect(result.metrics?.stepBoundariesMs).toEqual([]);
expect(result.metrics?.usage).toEqual({ inputTokens: 50, outputTokens: 10, totalTokens: 60 }); expect(result.metrics?.usage).toEqual({ inputTokens: 50, outputTokens: 10, totalTokens: 60 });
}); });

View file

@ -294,7 +294,6 @@ describe('CodexKtxLlmRuntime', () => {
runner: fakeRunner, runner: fakeRunner,
startMcpServer, startMcpServer,
}); });
const onStepFinish = vi.fn();
const result = await runtime.runAgentLoop({ const result = await runtime.runAgentLoop({
modelRole: 'default', modelRole: 'default',
@ -302,7 +301,6 @@ describe('CodexKtxLlmRuntime', () => {
userPrompt: 'user', userPrompt: 'user',
stepBudget: 5, stepBudget: 5,
telemetryTags: {}, telemetryTags: {},
onStepFinish,
toolSet: { toolSet: {
aliased_wiki_tool: { aliased_wiki_tool: {
name: 'wiki_search', name: 'wiki_search',
@ -315,7 +313,6 @@ describe('CodexKtxLlmRuntime', () => {
expect(result.stopReason).toBe('natural'); expect(result.stopReason).toBe('natural');
expect(result.metrics).toMatchObject({ stepCount: 1, usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 } }); expect(result.metrics).toMatchObject({ stepCount: 1, usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 } });
expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 5 });
expect(startMcpServer).toHaveBeenCalledWith({ projectDir: '/tmp/project', toolSet: expect.any(Object) }); expect(startMcpServer).toHaveBeenCalledWith({ projectDir: '/tmp/project', toolSet: expect.any(Object) });
expect(fakeRunner.runStreamed).toHaveBeenCalledWith( expect(fakeRunner.runStreamed).toHaveBeenCalledWith(
expect.objectContaining({ expect.objectContaining({
@ -399,7 +396,6 @@ describe('CodexKtxLlmRuntime', () => {
modelSlots: { default: 'codex' }, modelSlots: { default: 'codex' },
runner: fakeRunner, runner: fakeRunner,
}); });
const onStepFinish = vi.fn();
const result = await runtime.runAgentLoop({ const result = await runtime.runAgentLoop({
modelRole: 'default', modelRole: 'default',
@ -407,7 +403,6 @@ describe('CodexKtxLlmRuntime', () => {
userPrompt: 'user', userPrompt: 'user',
stepBudget: 1, stepBudget: 1,
telemetryTags: {}, telemetryTags: {},
onStepFinish,
toolSet: { toolSet: {
first: { first: {
name: 'first', name: 'first',
@ -421,8 +416,6 @@ describe('CodexKtxLlmRuntime', () => {
expect(result.stopReason).toBe('budget'); expect(result.stopReason).toBe('budget');
expect(result.error).toBeUndefined(); expect(result.error).toBeUndefined();
expect(result.metrics).toMatchObject({ stepCount: 1 }); expect(result.metrics).toMatchObject({ stepCount: 1 });
expect(onStepFinish).toHaveBeenCalledTimes(1);
expect(onStepFinish).toHaveBeenCalledWith({ stepIndex: 1, stepBudget: 1 });
expect(fakeRunner.observedSignal()?.aborted).toBe(true); expect(fakeRunner.observedSignal()?.aborted).toBe(true);
}); });
@ -448,7 +441,6 @@ describe('CodexKtxLlmRuntime', () => {
modelSlots: { default: 'codex' }, modelSlots: { default: 'codex' },
runner: fakeRunner, runner: fakeRunner,
}); });
const onStepFinish = vi.fn();
const result = await runtime.runAgentLoop({ const result = await runtime.runAgentLoop({
modelRole: 'default', modelRole: 'default',
@ -456,53 +448,15 @@ describe('CodexKtxLlmRuntime', () => {
userPrompt: 'user', userPrompt: 'user',
stepBudget: 2, stepBudget: 2,
telemetryTags: {}, telemetryTags: {},
onStepFinish,
toolSet: {}, toolSet: {},
}); });
expect(result.stopReason).toBe('budget'); expect(result.stopReason).toBe('budget');
expect(result.error).toBeUndefined(); expect(result.error).toBeUndefined();
expect(result.metrics).toMatchObject({ stepCount: 2 }); expect(result.metrics).toMatchObject({ stepCount: 2 });
expect(onStepFinish).toHaveBeenCalledTimes(2);
expect(onStepFinish).toHaveBeenLastCalledWith({ stepIndex: 2, stepBudget: 2 });
expect(fakeRunner.observedSignal()?.aborted).toBe(true); expect(fakeRunner.observedSignal()?.aborted).toBe(true);
}); });
it('fires onStepFinish live as each step completes, before the stream drains', async () => {
const order: string[] = [];
async function* liveEvents() {
yield { type: 'turn.started' };
yield { type: 'item.completed', item: { type: 'mcp_tool_call', server: 'ktx', tool: 'a', status: 'completed' } };
order.push('yielded-after-step-1');
yield { type: 'item.completed', item: { type: 'mcp_tool_call', server: 'ktx', tool: 'b', status: 'completed' } };
order.push('yielded-after-step-2');
yield { type: 'item.completed', item: { type: 'agent_message', text: 'done' } };
yield { type: 'turn.completed', usage: { input_tokens: 1, output_tokens: 1 } };
}
const fakeRunner = { runStreamed: vi.fn(async () => liveEvents()) };
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
});
const result = await runtime.runAgentLoop({
modelRole: 'default',
systemPrompt: 'system',
userPrompt: 'user',
stepBudget: 10,
telemetryTags: {},
onStepFinish: ({ stepIndex }) => {
order.push(`step-${stepIndex}`);
},
toolSet: {},
});
expect(result.stopReason).toBe('natural');
expect(result.metrics).toMatchObject({ stepCount: 2 });
expect(order).toEqual(['step-1', 'yielded-after-step-1', 'step-2', 'yielded-after-step-2']);
});
it('surfaces the real Codex error event even when the SDK stream throws afterward', async () => { it('surfaces the real Codex error event even when the SDK stream throws afterward', async () => {
// The SDK yields the error/turn.failed events on stdout, then throws on the // The SDK yields the error/turn.failed events on stdout, then throws on the
// non-zero exit. The masked exit message must not hide the real API error. // non-zero exit. The masked exit message must not hide the real API error.

View file

@ -27,13 +27,13 @@ function snapshot(events: MemoryFlowEvent[], overrides: Partial<MemoryFlowReplay
} }
describe('buildDemoMetrics', () => { describe('buildDemoMetrics', () => {
it('estimates elapsed, agent steps, tool calls, and cost from event stream', () => { it('estimates elapsed, tool calls, and cost from event stream', () => {
const start = Date.UTC(2026, 0, 1, 0, 0, 0); const start = Date.UTC(2026, 0, 1, 0, 0, 0);
const input = snapshot( const input = snapshot(
[ [
{ type: 'source_acquired', adapter: 'live-database', trigger: 'demo_full', fileCount: 5, emittedAt: new Date(start).toISOString() }, { type: 'source_acquired', adapter: 'live-database', trigger: 'demo_full', fileCount: 5, emittedAt: new Date(start).toISOString() },
{ type: 'work_unit_started', unitKey: 'orders', skills: [], stepBudget: 40, emittedAt: new Date(start + 1000).toISOString() }, { type: 'work_unit_started', unitKey: 'orders', skills: [], emittedAt: new Date(start + 1000).toISOString() },
{ type: 'work_unit_step', unitKey: 'orders', stepIndex: 6, stepBudget: 40, emittedAt: new Date(start + 6000).toISOString() }, { type: 'work_unit_step', unitKey: 'orders', toolCalls: 6, emittedAt: new Date(start + 6000).toISOString() },
], ],
{ {
plannedWorkUnits: [ plannedWorkUnits: [
@ -51,8 +51,6 @@ describe('buildDemoMetrics', () => {
const metrics = buildDemoMetrics(input, { now: () => start + 10_000 }); const metrics = buildDemoMetrics(input, { now: () => start + 10_000 });
expect(metrics.elapsedMs).toBe(10_000); expect(metrics.elapsedMs).toBe(10_000);
expect(metrics.agentSteps).toBe(6);
expect(metrics.agentStepBudget).toBe(40);
expect(metrics.toolCalls).toBe(3); expect(metrics.toolCalls).toBe(3);
expect(metrics.workUnitsTotal).toBe(2); expect(metrics.workUnitsTotal).toBe(2);
expect(metrics.estimatedTokens).toBeGreaterThan(0); expect(metrics.estimatedTokens).toBeGreaterThan(0);
@ -71,7 +69,7 @@ describe('buildDemoMetrics', () => {
const input = snapshot( const input = snapshot(
[ [
{ type: 'source_acquired', adapter: 'a', trigger: 't', fileCount: 1, emittedAt: new Date(start).toISOString() }, { type: 'source_acquired', adapter: 'a', trigger: 't', fileCount: 1, emittedAt: new Date(start).toISOString() },
{ type: 'work_unit_started', unitKey: 'a', skills: [], stepBudget: 10, emittedAt: new Date(start + 1000).toISOString() }, { type: 'work_unit_started', unitKey: 'a', skills: [], emittedAt: new Date(start + 1000).toISOString() },
{ type: 'work_unit_finished', unitKey: 'a', status: 'success', emittedAt: new Date(start + 5000).toISOString() }, { type: 'work_unit_finished', unitKey: 'a', status: 'success', emittedAt: new Date(start + 5000).toISOString() },
], ],
{ {

View file

@ -176,8 +176,8 @@ describe('runKtxIngest', () => {
const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => { const runLocal = vi.fn(async (input: RunLocalIngestOptions): Promise<LocalIngestResult> => {
input.memoryFlow?.emit({ type: 'source_acquired', adapter: 'fake', trigger: 'manual_resync', fileCount: 2 }); input.memoryFlow?.emit({ type: 'source_acquired', adapter: 'fake', trigger: 'manual_resync', fileCount: 2 });
input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }); input.memoryFlow?.emit({ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 });
input.memoryFlow?.emit({ type: 'work_unit_started', unitKey: 'orders', skills: [], stepBudget: 4 }); input.memoryFlow?.emit({ type: 'work_unit_started', unitKey: 'orders', skills: [] });
input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'orders', stepIndex: 2, stepBudget: 4 }); input.memoryFlow?.emit({ type: 'work_unit_step', unitKey: 'orders', toolCalls: 2 });
return completedLocalBundleRun(input, 'cli-local-progress-1'); return completedLocalBundleRun(input, 'cli-local-progress-1');
}); });
const io = makeIo(); const io = makeIo();
@ -206,7 +206,7 @@ describe('runKtxIngest', () => {
{ percent: 15, message: 'Fetched 2 source files from fake' }, { percent: 15, message: 'Fetched 2 source files from fake' },
{ percent: 45, message: 'Planned 2 tasks' }, { percent: 45, message: 'Planned 2 tasks' },
expect.objectContaining({ expect.objectContaining({
message: 'Processing tasks: 0/2 complete, 1 active; latest orders step 2/4', message: 'Processing tasks: 0/2 complete, 1 active; latest orders · 2 actions',
transient: true, transient: true,
}), }),
]), ]),
@ -776,13 +776,11 @@ describe('runKtxIngest', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: 'metabase-col-6', unitKey: 'metabase-col-6',
skills: ['sl_capture'], skills: ['sl_capture'],
stepBudget: 40,
}); });
input.memoryFlow?.emit({ input.memoryFlow?.emit({
type: 'work_unit_step', type: 'work_unit_step',
unitKey: 'metabase-col-6', unitKey: 'metabase-col-6',
stepIndex: 7, toolCalls: 7,
stepBudget: 40,
}); });
input.memoryFlow?.emit({ input.memoryFlow?.emit({
type: 'stage_progress', type: 'stage_progress',
@ -806,7 +804,6 @@ describe('runKtxIngest', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: 'metabase-col-7', unitKey: 'metabase-col-7',
skills: ['sl_capture'], skills: ['sl_capture'],
stepBudget: 40,
}); });
input.progress?.onMetabaseChildCompleted?.({ input.progress?.onMetabaseChildCompleted?.({
metabaseConnectionId: 'prod-metabase', metabaseConnectionId: 'prod-metabase',
@ -831,8 +828,8 @@ describe('runKtxIngest', () => {
{ percent: 45, message: 'Planned 1 task' }, { percent: 45, message: 'Planned 1 task' },
{ percent: 55, message: 'Processing 1/1 tasks: metabase-col-6' }, { percent: 55, message: 'Processing 1/1 tasks: metabase-col-6' },
{ {
percent: 60, percent: 55,
message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 step 7/40', message: 'Processing tasks: 0/1 complete, 1 active; latest metabase-col-6 · 7 actions',
transient: true, transient: true,
}, },
{ percent: 81, message: 'Resolving text conflict for metabase-col-6' }, { percent: 81, message: 'Resolving text conflict for metabase-col-6' },
@ -1733,7 +1730,6 @@ describe('runKtxIngest', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: 'historic-sql-table-public-orders', unitKey: 'historic-sql-table-public-orders',
skills: ['historic_sql_table_digest'], skills: ['historic_sql_table_digest'],
stepBudget: 40,
}); });
input.memoryFlow?.emit({ input.memoryFlow?.emit({
type: 'work_unit_finished', type: 'work_unit_finished',
@ -1856,13 +1852,11 @@ describe('runKtxIngest', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey: 'historic-sql-table-public-orders', unitKey: 'historic-sql-table-public-orders',
skills: ['historic_sql_table_digest'], skills: ['historic_sql_table_digest'],
stepBudget: 40,
}); });
input.memoryFlow?.emit({ input.memoryFlow?.emit({
type: 'work_unit_step', type: 'work_unit_step',
unitKey: 'historic-sql-table-public-orders', unitKey: 'historic-sql-table-public-orders',
stepIndex: 7, toolCalls: 7,
stepBudget: 40,
}); });
input.memoryFlow?.emit({ input.memoryFlow?.emit({
type: 'work_unit_finished', type: 'work_unit_finished',
@ -1897,7 +1891,7 @@ describe('runKtxIngest', () => {
expect(stderr).toContain('[45%] Planned 2 tasks'); expect(stderr).toContain('[45%] Planned 2 tasks');
expect(stderr).toContain('[55%] Processing 1/2 tasks: historic-sql-table-public-orders'); expect(stderr).toContain('[55%] Processing 1/2 tasks: historic-sql-table-public-orders');
expect(stderr).toContain( expect(stderr).toContain(
'\r[58%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders step 7/40\u001b[K', '\r[55%] Processing tasks: 0/2 complete, 1 active; latest historic-sql-table-public-orders · 7 actions\u001b[K',
); );
expect(stderr).toContain('[68%] Processed 1/2 tasks'); expect(stderr).toContain('[68%] Processed 1/2 tasks');
}); });
@ -1954,11 +1948,10 @@ describe('runKtxIngest', () => {
type: 'work_unit_started', type: 'work_unit_started',
unitKey, unitKey,
skills: ['historic_sql_table_digest'], skills: ['historic_sql_table_digest'],
stepBudget: 40,
}); });
} }
for (const unitKey of workUnitKeys) { for (const unitKey of workUnitKeys) {
input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, stepIndex: 1, stepBudget: 40 }); input.memoryFlow?.emit({ type: 'work_unit_step', unitKey, toolCalls: 1 });
} }
input.memoryFlow?.finish('done'); input.memoryFlow?.finish('done');
return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job'); return completedLocalBundleRun(input, input.jobId ?? 'historic-concurrent-progress-job');
@ -1986,10 +1979,10 @@ describe('runKtxIngest', () => {
const stderr = io.stderr(); const stderr = io.stderr();
expect(stderr).toContain( expect(stderr).toContain(
'\r[56%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers step 1/40\u001b[K', '\r[55%] Processing tasks: 0/6 complete, 6 active; latest historic-sql-table-public-suppliers · 1 action\u001b[K',
); );
expect(stderr).not.toContain( expect(stderr).not.toContain(
'\n[56%] Processing 6/6 tasks: historic-sql-table-public-suppliers step 1/40\n', '\n[55%] Processing 6/6 tasks: historic-sql-table-public-suppliers · 1 action\n',
); );
expect(stderr).toContain('\n[100%] Ingest completed\n'); expect(stderr).toContain('\n[100%] Ingest completed\n');
}); });

View file

@ -46,9 +46,9 @@ function replay(): MemoryFlowReplayInput {
{ type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 },
{ type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 4 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
{ type: 'work_unit_started', unitKey: 'customers', skills: ['wiki_capture'], stepBudget: 4 }, { type: 'work_unit_started', unitKey: 'customers', skills: ['wiki_capture'] },
{ type: 'work_unit_finished', unitKey: 'customers', status: 'failed', reason: 'validation reset' }, { type: 'work_unit_finished', unitKey: 'customers', status: 'failed', reason: 'validation reset' },
{ type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 1 }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 1 },
{ type: 'saved', commitSha: 'abc12345', wikiCount: 1, slCount: 1 }, { type: 'saved', commitSha: 'abc12345', wikiCount: 1, slCount: 1 },

View file

@ -35,10 +35,10 @@ function replayInput(): MemoryFlowReplayInput {
{ type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 }, { type: 'raw_snapshot_written', syncId: 'sync-1', rawFileCount: 2 },
{ type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 1, modified: 1, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 2, workUnitCount: 2, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
{ type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' }, { type: 'candidate_action', unitKey: 'orders', target: 'wiki', action: 'created', key: 'wiki/orders.md' },
{ type: 'work_unit_finished', unitKey: 'orders', status: 'success' }, { type: 'work_unit_finished', unitKey: 'orders', status: 'success' },
{ type: 'work_unit_started', unitKey: 'customers', skills: ['sl_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'customers', skills: ['sl_capture'] },
{ type: 'candidate_action', unitKey: 'customers', target: 'sl', action: 'updated', key: 'orbit_demo.customers' }, { type: 'candidate_action', unitKey: 'customers', target: 'sl', action: 'updated', key: 'orbit_demo.customers' },
{ type: 'work_unit_finished', unitKey: 'customers', status: 'success' }, { type: 'work_unit_finished', unitKey: 'customers', status: 'success' },
{ type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 }, { type: 'reconciliation_finished', conflictCount: 0, fallbackCount: 0 },
@ -220,7 +220,7 @@ describe('MemoryFlowTuiApp', () => {
{ type: 'source_acquired', adapter: 'live-database', trigger: 'manual_resync', fileCount: 1 }, { type: 'source_acquired', adapter: 'live-database', trigger: 'manual_resync', fileCount: 1 },
{ type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 1, modified: 0, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
], ],
plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }], plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }],
}; };
@ -240,7 +240,7 @@ describe('MemoryFlowTuiApp', () => {
{ type: 'source_acquired', adapter: 'dbt-descriptions', trigger: 'manual_resync', fileCount: 3 }, { type: 'source_acquired', adapter: 'dbt-descriptions', trigger: 'manual_resync', fileCount: 3 },
{ type: 'diff_computed', added: 11, modified: 0, deleted: 0, unchanged: 0 }, { type: 'diff_computed', added: 11, modified: 0, deleted: 0, unchanged: 0 },
{ type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 }, { type: 'chunks_planned', chunkCount: 1, workUnitCount: 1, evictionCount: 0 },
{ type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'], stepBudget: 40 }, { type: 'work_unit_started', unitKey: 'orders', skills: ['wiki_capture'] },
], ],
plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }], plannedWorkUnits: [{ unitKey: 'orders', rawFiles: ['orders'], peerFileCount: 0, dependencyCount: 1 }],
}; };