mirror of
https://github.com/Kaelio/ktx.git
synced 2026-06-28 08:49:38 +02:00
fix(ingest): drive work-unit progress from tool calls, not turn counts (#269)
The ingest HUD showed "step 70/40" because the Claude subscription runtime re-derived a per-turn counter that could not match the SDK's num_turns and overshot the maxTurns budget. Replace the turn-based work_unit_step heartbeat with a real, observed tool-call count (no denominator), report metrics.stepCount from the SDK's authoritative num_turns, and delete the brittle countsAsAssistantTurn denylist plus the now-unused onStepFinish callback across the runtime port and all three runtimes. Reconcile and curator progress move to the same tool-call heartbeat.
This commit is contained in:
parent
18245c2373
commit
2896f9fb91
26 changed files with 138 additions and 463 deletions
|
|
@ -39,7 +39,6 @@ export interface CuratorPaginationInput {
|
|||
buildUserPrompt: (input: CuratorPaginationPromptInput) => string;
|
||||
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
|
||||
getReconciliationActions: () => MemoryAction[];
|
||||
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
}
|
||||
|
||||
|
|
@ -245,10 +244,6 @@ export class CuratorPaginationService implements CuratorPaginationPort {
|
|||
jobId: params.input.jobId,
|
||||
forceRun: params.forceRun,
|
||||
abortSignal: params.input.abortSignal,
|
||||
onStepFinish: params.input.onStepFinish
|
||||
? ({ stepIndex, stepBudget }) =>
|
||||
params.input.onStepFinish?.({ passNumber: params.passNumber, stepIndex, stepBudget })
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -939,14 +939,13 @@ export class IngestBundleRunner {
|
|||
workUnitSettings: { maxConcurrency: number; stepBudget: number; failureMode: 'abort' | 'continue' };
|
||||
transcriptDir: string;
|
||||
transcriptSummaries: Map<string, MutableToolTranscriptSummary>;
|
||||
recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => void;
|
||||
recordTranscriptEntry(path: string): (entry: ToolCallLogEntry) => MutableToolTranscriptSummary;
|
||||
stageIndex: StageIndex;
|
||||
includeContextEvidenceTools: boolean;
|
||||
currentTableExists(tableRef: string): Promise<boolean>;
|
||||
memoryFlow?: MemoryFlowEventSink;
|
||||
abortSignal?: AbortSignal;
|
||||
wuSkillNames: string[];
|
||||
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
|
||||
}): Promise<WorkUnitOutcome> {
|
||||
const session: CaptureSession = {
|
||||
userId: 'system',
|
||||
|
|
@ -1050,7 +1049,6 @@ export class IngestBundleRunner {
|
|||
type: 'work_unit_started',
|
||||
unitKey: input.wu.unitKey,
|
||||
skills: input.wuSkillNames,
|
||||
stepBudget: input.workUnitSettings.stepBudget,
|
||||
});
|
||||
return executeWorkUnit(
|
||||
{
|
||||
|
|
@ -1074,8 +1072,10 @@ export class IngestBundleRunner {
|
|||
slIndex: input.slIndex,
|
||||
priorProvenance: input.priorProvenance,
|
||||
}),
|
||||
buildToolSet: (wuInner) =>
|
||||
wrapToolsWithLogger(
|
||||
buildToolSet: (wuInner) => {
|
||||
const transcriptPath = join(input.transcriptDir, `${wuInner.unitKey}.jsonl`);
|
||||
const record = input.recordTranscriptEntry(transcriptPath);
|
||||
return wrapToolsWithLogger(
|
||||
buildWuToolSet({
|
||||
sourceKey: input.job.sourceKey,
|
||||
stagedDir: input.stagedDir,
|
||||
|
|
@ -1084,10 +1084,23 @@ export class IngestBundleRunner {
|
|||
emitUnmappedFallbackTool: wuEmitUnmappedFallbackTool,
|
||||
toolsetTools: wuToolset.toRuntimeTools(wuToolContext),
|
||||
}),
|
||||
join(input.transcriptDir, `${wuInner.unitKey}.jsonl`),
|
||||
transcriptPath,
|
||||
wuInner.unitKey,
|
||||
{ onEntry: input.recordTranscriptEntry(join(input.transcriptDir, `${wuInner.unitKey}.jsonl`)) },
|
||||
),
|
||||
{
|
||||
// Drive the live HUD heartbeat from real tool calls: each invocation
|
||||
// ticks the running per-unit count. This is an observed signal, not a
|
||||
// re-derived turn count, so it can never overshoot a budget.
|
||||
onEntry: (entry) => {
|
||||
const summary = record(entry);
|
||||
input.memoryFlow?.emit({
|
||||
type: 'work_unit_step',
|
||||
unitKey: wuInner.unitKey,
|
||||
toolCalls: summary.toolCallCount,
|
||||
});
|
||||
},
|
||||
},
|
||||
);
|
||||
},
|
||||
captureSession: session,
|
||||
sessionActions,
|
||||
modelRole: 'candidateExtraction',
|
||||
|
|
@ -1096,7 +1109,6 @@ export class IngestBundleRunner {
|
|||
connectionId: input.job.connectionId,
|
||||
jobId: input.job.jobId,
|
||||
toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
|
||||
onStepFinish: input.onStepFinish,
|
||||
abortSignal: input.abortSignal,
|
||||
},
|
||||
input.wu,
|
||||
|
|
@ -1166,11 +1178,12 @@ export class IngestBundleRunner {
|
|||
const transcriptDir = this.deps.storage.resolveTranscriptDir(job.jobId);
|
||||
const recordTranscriptEntry =
|
||||
(path: string) =>
|
||||
(entry: ToolCallLogEntry): void => {
|
||||
(entry: ToolCallLogEntry): MutableToolTranscriptSummary => {
|
||||
const current =
|
||||
transcriptSummaries.get(entry.wuKey) ?? createMutableToolTranscriptSummary(entry.wuKey, path);
|
||||
recordToolTranscriptEntry(current, entry);
|
||||
transcriptSummaries.set(entry.wuKey, current);
|
||||
return current;
|
||||
};
|
||||
const overrideReport = await this.loadOverrideReport(job);
|
||||
|
||||
|
|
@ -1639,9 +1652,6 @@ export class IngestBundleRunner {
|
|||
abortSignal: ctx?.abortSignal,
|
||||
memoryFlow,
|
||||
wuSkillNames,
|
||||
onStepFinish: ({ stepIndex, stepBudget }) => {
|
||||
memoryFlow?.emit({ type: 'work_unit_step', unitKey: wu.unitKey, stepIndex, stepBudget });
|
||||
},
|
||||
});
|
||||
},
|
||||
});
|
||||
|
|
@ -2013,6 +2023,45 @@ export class IngestBundleRunner {
|
|||
let curatorWarnings: string[] = [];
|
||||
let reconcileOutcome: Awaited<ReturnType<typeof runReconciliationStage4>>;
|
||||
|
||||
// Reconcile shares the work-unit liveness model: the HUD heartbeat is driven
|
||||
// by real tool calls (a monotonic, observed count), not a re-derived turn
|
||||
// counter. The soft cap only paces the phase progress bar; it is never shown
|
||||
// to the user, so it cannot read as a misleading "X/Y" fraction.
|
||||
const reconcileTranscriptPath = join(transcriptDir, 'reconcile.jsonl');
|
||||
const reconcileProgressSoftCap = 40;
|
||||
const buildReconcileToolSetWithHeartbeat = (): KtxRuntimeToolSet => {
|
||||
const record = recordTranscriptEntry(reconcileTranscriptPath);
|
||||
return wrapToolsWithLogger(
|
||||
buildReconcileToolSet({
|
||||
loadSkillTool: rcLoadSkill,
|
||||
stageListTool: rcStageListTool,
|
||||
stageDiffTool: rcStageDiffTool,
|
||||
evictionListTool: rcEvictionListTool,
|
||||
emitConflictResolutionTool: rcEmitConflictResolutionTool,
|
||||
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
|
||||
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
|
||||
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
|
||||
readRawSpanTool: rcRawSpanTool,
|
||||
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
|
||||
}),
|
||||
reconcileTranscriptPath,
|
||||
'reconcile',
|
||||
{
|
||||
onEntry: (entry) => {
|
||||
const summary = record(entry);
|
||||
if (!stage4) {
|
||||
return;
|
||||
}
|
||||
const label = `Reconciling results · ${summary.toolCallCount} action${
|
||||
summary.toolCallCount === 1 ? '' : 's'
|
||||
}`;
|
||||
emitStageProgress('reconciliation', 85, label, { transient: true });
|
||||
void stage4.updateProgress(Math.min(0.95, summary.toolCallCount / reconcileProgressSoftCap), label);
|
||||
},
|
||||
},
|
||||
);
|
||||
};
|
||||
|
||||
const reconcileStartedAt = Date.now();
|
||||
const reconcileMode = contextReport && this.deps.curatorPagination ? 'curator' : 'single';
|
||||
if (contextReport && this.deps.curatorPagination) {
|
||||
|
|
@ -2035,39 +2084,8 @@ export class IngestBundleRunner {
|
|||
}),
|
||||
buildUserPrompt: ({ summary, items, runState }) =>
|
||||
buildReconcileUserPrompt(stageIndex, eviction, { summary, items }, reconcileNotes, runState),
|
||||
buildToolSet: (_passNumber) =>
|
||||
wrapToolsWithLogger(
|
||||
buildReconcileToolSet({
|
||||
loadSkillTool: rcLoadSkill,
|
||||
stageListTool: rcStageListTool,
|
||||
stageDiffTool: rcStageDiffTool,
|
||||
evictionListTool: rcEvictionListTool,
|
||||
emitConflictResolutionTool: rcEmitConflictResolutionTool,
|
||||
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
|
||||
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
|
||||
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
|
||||
readRawSpanTool: rcRawSpanTool,
|
||||
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
|
||||
}),
|
||||
join(transcriptDir, 'reconcile.jsonl'),
|
||||
'reconcile',
|
||||
{ onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) },
|
||||
),
|
||||
buildToolSet: (_passNumber) => buildReconcileToolSetWithHeartbeat(),
|
||||
getReconciliationActions: () => reconcileActions,
|
||||
onStepFinish: stage4
|
||||
? ({ passNumber, stepIndex, stepBudget }) => {
|
||||
emitStageProgress(
|
||||
'reconciliation',
|
||||
85,
|
||||
`Reconciling results: pass ${passNumber} step ${stepIndex}/${stepBudget}`,
|
||||
{ transient: true },
|
||||
);
|
||||
void stage4.updateProgress(
|
||||
stepIndex / stepBudget,
|
||||
`Reconciling results · pass ${passNumber} step ${stepIndex}`,
|
||||
);
|
||||
}
|
||||
: undefined,
|
||||
abortSignal: ctx?.abortSignal,
|
||||
});
|
||||
curatorReport = curatorOutcome.report;
|
||||
|
|
@ -2091,38 +2109,13 @@ export class IngestBundleRunner {
|
|||
canonicalPins: relevantCanonicalPins,
|
||||
}),
|
||||
buildUserPrompt: (idx, ev) => buildReconcileUserPrompt(idx, ev, undefined, reconcileNotes),
|
||||
buildToolSet: () =>
|
||||
wrapToolsWithLogger(
|
||||
buildReconcileToolSet({
|
||||
loadSkillTool: rcLoadSkill,
|
||||
stageListTool: rcStageListTool,
|
||||
stageDiffTool: rcStageDiffTool,
|
||||
evictionListTool: rcEvictionListTool,
|
||||
emitConflictResolutionTool: rcEmitConflictResolutionTool,
|
||||
emitEvictionDecisionTool: rcEmitEvictionDecisionTool,
|
||||
emitArtifactResolutionTool: rcEmitArtifactResolutionTool,
|
||||
emitUnmappedFallbackTool: rcEmitUnmappedFallbackTool,
|
||||
readRawSpanTool: rcRawSpanTool,
|
||||
toolsetTools: rcToolset.toRuntimeTools(rcToolContext),
|
||||
}),
|
||||
join(transcriptDir, 'reconcile.jsonl'),
|
||||
'reconcile',
|
||||
{ onEntry: recordTranscriptEntry(join(transcriptDir, 'reconcile.jsonl')) },
|
||||
),
|
||||
buildToolSet: () => buildReconcileToolSetWithHeartbeat(),
|
||||
modelRole: 'reconcile',
|
||||
stepBudget: 60,
|
||||
sourceKey: job.sourceKey,
|
||||
jobId: job.jobId,
|
||||
force: !!overrideReport,
|
||||
abortSignal: ctx?.abortSignal,
|
||||
onStepFinish: stage4
|
||||
? ({ stepIndex, stepBudget }) => {
|
||||
emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, {
|
||||
transient: true,
|
||||
});
|
||||
void stage4.updateProgress(stepIndex / stepBudget, `Reconciling results · step ${stepIndex}`);
|
||||
}
|
||||
: undefined,
|
||||
});
|
||||
}
|
||||
await runTrace.event(
|
||||
|
|
|
|||
|
|
@ -174,7 +174,7 @@ export function ingestReportToMemoryFlowReplay(
|
|||
|
||||
const actions = allReportActions(report);
|
||||
const workUnitEvents: MemoryFlowEvent[] = report.body.workUnits.flatMap((workUnit) => [
|
||||
{ type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [], stepBudget: 0 } satisfies MemoryFlowEvent,
|
||||
{ type: 'work_unit_started', unitKey: workUnit.unitKey, skills: [] } satisfies MemoryFlowEvent,
|
||||
...workUnit.actions.map(
|
||||
(action): MemoryFlowEvent => ({
|
||||
type: 'candidate_action',
|
||||
|
|
|
|||
|
|
@ -81,13 +81,11 @@ const memoryFlowEventSchema = z.discriminatedUnion('type', [
|
|||
type: z.literal('work_unit_started'),
|
||||
unitKey: z.string().min(1),
|
||||
skills: z.array(z.string().min(1)),
|
||||
stepBudget: z.number().int().min(0),
|
||||
}),
|
||||
eventSchema({
|
||||
type: z.literal('work_unit_step'),
|
||||
unitKey: z.string().min(1),
|
||||
stepIndex: z.number().int().min(0),
|
||||
stepBudget: z.number().int().min(0),
|
||||
toolCalls: z.number().int().min(0),
|
||||
}),
|
||||
eventSchema({
|
||||
type: z.literal('candidate_action'),
|
||||
|
|
|
|||
|
|
@ -71,13 +71,11 @@ type MemoryFlowEventPayload =
|
|||
type: 'work_unit_started';
|
||||
unitKey: string;
|
||||
skills: string[];
|
||||
stepBudget: number;
|
||||
}
|
||||
| {
|
||||
type: 'work_unit_step';
|
||||
unitKey: string;
|
||||
stepIndex: number;
|
||||
stepBudget: number;
|
||||
toolCalls: number;
|
||||
}
|
||||
| {
|
||||
type: 'candidate_action';
|
||||
|
|
|
|||
|
|
@ -324,7 +324,6 @@ export interface CuratorPaginationPort {
|
|||
}) => string;
|
||||
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
|
||||
getReconciliationActions: () => MemoryAction[];
|
||||
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
}): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,6 @@ export interface WorkUnitExecutionDeps {
|
|||
sourceKey: string;
|
||||
connectionId: string;
|
||||
jobId: string;
|
||||
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
toolFailureCount?: (unitKey: string) => number;
|
||||
}
|
||||
|
|
@ -107,7 +106,6 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
|
|||
unitKey: wu.unitKey,
|
||||
jobId: deps.jobId,
|
||||
},
|
||||
onStepFinish: deps.onStepFinish,
|
||||
abortSignal: deps.abortSignal,
|
||||
});
|
||||
} catch (error) {
|
||||
|
|
|
|||
|
|
@ -15,7 +15,6 @@ export interface ReconciliationContext {
|
|||
sourceKey: string;
|
||||
jobId: string;
|
||||
force?: boolean;
|
||||
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
|
||||
abortSignal?: AbortSignal;
|
||||
forceRun?: boolean;
|
||||
}
|
||||
|
|
@ -40,7 +39,6 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi
|
|||
toolSet: ctx.buildToolSet(),
|
||||
stepBudget: ctx.stepBudget,
|
||||
telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId },
|
||||
onStepFinish: ctx.onStepFinish,
|
||||
abortSignal: ctx.abortSignal,
|
||||
});
|
||||
return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) };
|
||||
|
|
|
|||
|
|
@ -322,21 +322,11 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
messages: promptMessages.messages,
|
||||
tools: built.tools as ToolSet,
|
||||
...(params.abortSignal ? { abortSignal: params.abortSignal } : {}),
|
||||
onStepFinish: async () => {
|
||||
// Count model round-trips locally for metrics. `stepCountIs(stepBudget)`
|
||||
// caps the loop, so this counter never exceeds the budget.
|
||||
onStepFinish: () => {
|
||||
stepIndex += 1;
|
||||
stepBoundariesMs.push(Date.now() - startedAt);
|
||||
if (!params.onStepFinish) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget });
|
||||
} catch (err) {
|
||||
this.logger.warn(
|
||||
`[agent-runner] onStepFinish callback threw; ignoring: ${
|
||||
err instanceof Error ? err.message : String(err)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
};
|
||||
const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request));
|
||||
|
|
|
|||
|
|
@ -6,7 +6,6 @@ import {
|
|||
type SDKResultMessage,
|
||||
} from '@anthropic-ai/claude-agent-sdk';
|
||||
import { z } from 'zod';
|
||||
import { noopLogger, type KtxLogger } from '../../context/core/config.js';
|
||||
import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js';
|
||||
import { createKtxClaudeCodeEnv } from './claude-code-env.js';
|
||||
import { resolveClaudeCodeModel } from './claude-code-models.js';
|
||||
|
|
@ -53,7 +52,6 @@ export interface ClaudeCodeKtxLlmRuntimeDeps {
|
|||
modelSlots: { default: string } & Partial<Record<string, string>>;
|
||||
query?: QueryFn;
|
||||
env?: NodeJS.ProcessEnv;
|
||||
logger?: KtxLogger;
|
||||
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
|
||||
}
|
||||
|
||||
|
|
@ -85,22 +83,6 @@ function isResult(message: SDKMessage): message is SDKResultMessage {
|
|||
return message.type === 'result';
|
||||
}
|
||||
|
||||
// Skip emissions the SDK does not count toward `num_turns`: `pause_turn` continuations and
|
||||
// errored partials (e.g. `max_output_tokens`) it retries internally. Without this, the
|
||||
// runtime's step counter outruns `maxTurns` and the HUD renders e.g. `step 69/40`.
|
||||
function countsAsAssistantTurn(message: SDKMessage): boolean {
|
||||
if (message.type !== 'assistant' || message.parent_tool_use_id !== null) {
|
||||
return false;
|
||||
}
|
||||
if (message.error !== undefined) {
|
||||
return false;
|
||||
}
|
||||
if (message.message.stop_reason === 'pause_turn') {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
function resultError(result: SDKResultMessage): Error | undefined {
|
||||
if (result.subtype === 'success') {
|
||||
return undefined;
|
||||
|
|
@ -296,7 +278,6 @@ async function collectResult(params: {
|
|||
options: Options;
|
||||
allowedToolIds: Set<string>;
|
||||
expectedMcpServerNames: Set<string>;
|
||||
onAssistantTurn?: () => Promise<void>;
|
||||
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
|
||||
abortSignal?: AbortSignal;
|
||||
}): Promise<ClaudeQueryOutcome> {
|
||||
|
|
@ -321,9 +302,6 @@ async function collectResult(params: {
|
|||
params.rateLimitGovernor?.report(rateLimitSignal);
|
||||
}
|
||||
assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames);
|
||||
if (countsAsAssistantTurn(message)) {
|
||||
await params.onAssistantTurn?.();
|
||||
}
|
||||
if (isResult(message)) {
|
||||
result = message;
|
||||
}
|
||||
|
|
@ -358,11 +336,9 @@ async function collectResultWithRateLimitRetry(params: Parameters<typeof collect
|
|||
|
||||
export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
|
||||
private readonly runQuery: QueryFn;
|
||||
private readonly logger: KtxLogger;
|
||||
|
||||
constructor(private readonly deps: ClaudeCodeKtxLlmRuntimeDeps) {
|
||||
this.runQuery = deps.query ?? defaultQuery;
|
||||
this.logger = deps.logger ?? noopLogger;
|
||||
}
|
||||
|
||||
async generateText(input: KtxGenerateTextInput): Promise<string> {
|
||||
|
|
@ -434,9 +410,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
}
|
||||
|
||||
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
|
||||
let stepIndex = 0;
|
||||
const startedAt = Date.now();
|
||||
const stepBoundariesMs: number[] = [];
|
||||
try {
|
||||
const options = baseOptions({
|
||||
projectDir: this.deps.projectDir,
|
||||
|
|
@ -453,22 +427,6 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
expectedMcpServerNames: expectedMcpServerNames(params.toolSet),
|
||||
rateLimitGovernor: this.deps.rateLimitGovernor,
|
||||
abortSignal: params.abortSignal,
|
||||
onAssistantTurn: async () => {
|
||||
stepIndex += 1;
|
||||
stepBoundariesMs.push(Date.now() - startedAt);
|
||||
if (!params.onStepFinish) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
await params.onStepFinish({ stepIndex, stepBudget: params.stepBudget });
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`[claude-code-runner] onStepFinish callback threw; ignoring: ${
|
||||
error instanceof Error ? error.message : String(error)
|
||||
}`,
|
||||
);
|
||||
}
|
||||
},
|
||||
});
|
||||
const stopReason = mapClaudeCodeStopReason(result);
|
||||
const error = resultError(result);
|
||||
|
|
@ -477,8 +435,12 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
...(stopReason === 'error' && error ? { error } : {}),
|
||||
metrics: {
|
||||
totalMs: Date.now() - startedAt,
|
||||
stepCount: stepIndex,
|
||||
stepBoundariesMs,
|
||||
// Authoritative turn count from the SDK result. The runtime no longer
|
||||
// re-derives a per-turn counter: it could not match the SDK's `num_turns`
|
||||
// and overshot `maxTurns` (the source of the misleading `step 70/40`).
|
||||
// Per-step boundaries require that counter and are not consumed anywhere.
|
||||
stepCount: result.num_turns,
|
||||
stepBoundariesMs: [],
|
||||
usage: claudeTokenUsage(result),
|
||||
},
|
||||
};
|
||||
|
|
@ -490,7 +452,7 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
return {
|
||||
stopReason: 'error',
|
||||
error: err,
|
||||
metrics: { totalMs: Date.now() - startedAt, stepCount: stepIndex, stepBoundariesMs, usage: {} },
|
||||
metrics: { totalMs: Date.now() - startedAt, stepCount: 0, stepBoundariesMs: [], usage: {} },
|
||||
};
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
import { z } from 'zod';
|
||||
import { noopLogger, type KtxLogger } from '../core/config.js';
|
||||
import { isAbortError, linkAbortSignal } from '../core/abort.js';
|
||||
import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js';
|
||||
import {
|
||||
|
|
@ -25,7 +24,6 @@ export interface CodexKtxLlmRuntimeDeps {
|
|||
modelSlots: { default: string } & Partial<Record<string, string>>;
|
||||
runner?: CodexSdkRunner;
|
||||
startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise<CodexRuntimeMcpServerHandle>;
|
||||
logger?: KtxLogger;
|
||||
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
|
||||
}
|
||||
|
||||
|
|
@ -40,7 +38,6 @@ function promptWithSystem(system: string | undefined, prompt: string): string {
|
|||
interface CollectCodexEventsOptions {
|
||||
stepBudget?: number;
|
||||
abortController?: AbortController;
|
||||
onStep?: (stepIndex: number) => void | Promise<void>;
|
||||
}
|
||||
|
||||
interface CollectCodexEventsResult {
|
||||
|
|
@ -58,8 +55,8 @@ function isTurnCompleted(event: unknown): boolean {
|
|||
}
|
||||
|
||||
/**
|
||||
* Drains the Codex stream once, emitting a step as each agent action completes
|
||||
* so callers see live progress and the step budget is enforced mid-run. Every
|
||||
* Drains the Codex stream once, counting each completed agent action so the
|
||||
* step budget is enforced mid-run. Every
|
||||
* completed agent-action item counts (see {@link isCompletedAgentStep}), so
|
||||
* built-in `command_execution` steps decrement the budget the same as
|
||||
* `mcp_tool_call`s. A turn that produced no actions still counts as one step,
|
||||
|
|
@ -93,7 +90,6 @@ async function collectEvents(
|
|||
}
|
||||
|
||||
completedSteps += 1;
|
||||
await options.onStep?.(completedSteps);
|
||||
if (isActionStep && options.stepBudget !== undefined && completedSteps >= options.stepBudget) {
|
||||
budgetExceeded = true;
|
||||
options.abortController?.abort();
|
||||
|
|
@ -170,11 +166,9 @@ function isCodexRateLimitError(error: Error | undefined): boolean {
|
|||
|
||||
export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
|
||||
private readonly runner: CodexSdkRunner;
|
||||
private readonly logger: KtxLogger;
|
||||
|
||||
constructor(private readonly deps: CodexKtxLlmRuntimeDeps) {
|
||||
this.runner = deps.runner ?? new CodexSdkCliRunner();
|
||||
this.logger = deps.logger ?? noopLogger;
|
||||
}
|
||||
|
||||
private async runWithRateLimitRetry<T>(
|
||||
|
|
@ -328,15 +322,6 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
}
|
||||
: {}),
|
||||
});
|
||||
const onStep = async (stepIndex: number): Promise<void> => {
|
||||
try {
|
||||
await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget });
|
||||
} catch (error) {
|
||||
this.logger.warn(
|
||||
`[codex-runner] onStepFinish callback threw; ignoring: ${error instanceof Error ? error.message : String(error)}`,
|
||||
);
|
||||
}
|
||||
};
|
||||
const result = await this.runWithRateLimitRetry(
|
||||
params.abortSignal,
|
||||
async () => {
|
||||
|
|
@ -352,7 +337,7 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
|
|||
env: config.env,
|
||||
signal: abortController.signal,
|
||||
}),
|
||||
{ stepBudget: params.stepBudget, abortController, onStep },
|
||||
{ stepBudget: params.stepBudget, abortController },
|
||||
);
|
||||
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
|
||||
return { collected, summary };
|
||||
|
|
|
|||
|
|
@ -17,12 +17,6 @@ export type KtxRuntimeToolSet = Record<string, KtxRuntimeToolDescriptor>;
|
|||
|
||||
export type RunLoopStopReason = 'budget' | 'natural' | 'error';
|
||||
|
||||
/** @internal */
|
||||
export interface RunLoopStepInfo {
|
||||
stepIndex: number;
|
||||
stepBudget: number;
|
||||
}
|
||||
|
||||
export interface LlmTokenUsage {
|
||||
inputTokens?: number;
|
||||
outputTokens?: number;
|
||||
|
|
@ -48,7 +42,6 @@ export interface RunLoopParams {
|
|||
toolSet: KtxRuntimeToolSet;
|
||||
stepBudget: number;
|
||||
telemetryTags: Record<string, string>;
|
||||
onStepFinish?: (info: RunLoopStepInfo) => void | Promise<void>;
|
||||
abortSignal?: AbortSignal;
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -15,8 +15,6 @@ interface DemoMetricsTuning {
|
|||
interface DemoMetricsSnapshot {
|
||||
elapsedMs: number;
|
||||
etaMs: number | null;
|
||||
agentSteps: number;
|
||||
agentStepBudget: number;
|
||||
toolCalls: number;
|
||||
workUnitsStarted: number;
|
||||
workUnitsFinished: number;
|
||||
|
|
@ -37,18 +35,6 @@ function eventsOf<T extends MemoryFlowEvent['type']>(
|
|||
return events.filter((event): event is Extract<MemoryFlowEvent, { type: T }> => event.type === type);
|
||||
}
|
||||
|
||||
function maxAgentStep(events: MemoryFlowEvent[]): { step: number; budget: number } {
|
||||
const steps = eventsOf(events, 'work_unit_step');
|
||||
const started = eventsOf(events, 'work_unit_started');
|
||||
const stepIndex = steps.reduce((max, event) => Math.max(max, event.stepIndex), 0);
|
||||
const stepBudget = Math.max(
|
||||
0,
|
||||
...steps.map((event) => event.stepBudget),
|
||||
...started.map((event) => event.stepBudget),
|
||||
);
|
||||
return { step: stepIndex, budget: stepBudget };
|
||||
}
|
||||
|
||||
function totalToolCalls(input: MemoryFlowReplayInput): number {
|
||||
return input.details.transcripts.reduce((total, transcript) => total + transcript.toolCallCount, 0);
|
||||
}
|
||||
|
|
@ -96,11 +82,10 @@ export function buildDemoMetrics(
|
|||
const nowMs = (options.now ?? Date.now)();
|
||||
const elapsedMs = elapsedMsFromEvents(input.events, nowMs);
|
||||
|
||||
const { step, budget } = maxAgentStep(input.events);
|
||||
const toolCalls = totalToolCalls(input);
|
||||
const progress = workUnitProgress(input);
|
||||
const finishedCount = eventsOf(input.events, 'work_unit_finished').length;
|
||||
const stepDriver = Math.max(step, toolCalls, finishedCount * 4);
|
||||
const stepDriver = Math.max(toolCalls, finishedCount * 4);
|
||||
|
||||
const inputTokens = stepDriver * inputTokensPerStep;
|
||||
const outputTokens = stepDriver * outputTokensPerStep;
|
||||
|
|
@ -113,8 +98,6 @@ export function buildDemoMetrics(
|
|||
return {
|
||||
elapsedMs,
|
||||
etaMs: estimateEtaMs(elapsedMs, progress.finished, progress.total, input.status),
|
||||
agentSteps: step,
|
||||
agentStepBudget: budget,
|
||||
toolCalls,
|
||||
workUnitsStarted: progress.started,
|
||||
workUnitsFinished: progress.finished,
|
||||
|
|
|
|||
|
|
@ -398,9 +398,8 @@ function plainIngestEventProgress(
|
|||
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const completed = completedWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const active = activeWorkUnitCountThrough(snapshot, eventIndex);
|
||||
const stepFraction = event.stepBudget > 0 ? Math.min(1, event.stepIndex / event.stepBudget) : 0;
|
||||
const percent = total > 0 ? 55 + Math.ceil(((completed + stepFraction) / total) * 25) : 55;
|
||||
const latest = `${event.unitKey} step ${event.stepIndex}/${event.stepBudget}`;
|
||||
const percent = total > 0 ? 55 + Math.ceil((completed / total) * 25) : 55;
|
||||
const latest = `${event.unitKey} · ${pluralize(event.toolCalls, 'action')}`;
|
||||
return {
|
||||
percent,
|
||||
message: `Processing tasks: ${completed}/${total} complete, ${active} active; latest ${latest}`,
|
||||
|
|
|
|||
|
|
@ -139,31 +139,21 @@ function sourceDescription(input: MemoryFlowReplayInput): SourceInfo {
|
|||
return { type: info.type, name: conn, sourceCount: count, itemNounPlural: info.plural, readingVerb: info.verb, ingestDescription: info.description };
|
||||
}
|
||||
|
||||
function activeWorkUnits(
|
||||
input: MemoryFlowReplayInput,
|
||||
): Array<{ unitKey: string; stepIndex: number; stepBudget: number }> {
|
||||
function activeWorkUnits(input: MemoryFlowReplayInput): string[] {
|
||||
const finishedKeys = new Set<string>();
|
||||
const unitMap = new Map<string, { stepIndex: number; stepBudget: number }>();
|
||||
|
||||
for (const e of input.events) {
|
||||
if (e.type === 'work_unit_started') {
|
||||
unitMap.set(e.unitKey, { stepIndex: 0, stepBudget: e.stepBudget });
|
||||
}
|
||||
if (e.type === 'work_unit_step') {
|
||||
const existing = unitMap.get(e.unitKey);
|
||||
if (existing) {
|
||||
existing.stepIndex = e.stepIndex;
|
||||
existing.stepBudget = e.stepBudget;
|
||||
}
|
||||
}
|
||||
if (e.type === 'work_unit_finished') finishedKeys.add(e.unitKey);
|
||||
}
|
||||
|
||||
const result: Array<{ unitKey: string; stepIndex: number; stepBudget: number }> = [];
|
||||
for (const [unitKey, data] of unitMap) {
|
||||
if (!finishedKeys.has(unitKey)) result.push({ unitKey, ...data });
|
||||
const active: string[] = [];
|
||||
const seen = new Set<string>();
|
||||
for (const e of input.events) {
|
||||
if (e.type === 'work_unit_started' && !finishedKeys.has(e.unitKey) && !seen.has(e.unitKey)) {
|
||||
seen.add(e.unitKey);
|
||||
active.push(e.unitKey);
|
||||
}
|
||||
}
|
||||
return result;
|
||||
return active;
|
||||
}
|
||||
|
||||
function queuedWorkUnits(input: MemoryFlowReplayInput): string[] {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue