diff --git a/docs-site/content/docs/cli-reference/ktx-ingest.mdx b/docs-site/content/docs/cli-reference/ktx-ingest.mdx index 80820efa..ab3d231d 100644 --- a/docs-site/content/docs/cli-reference/ktx-ingest.mdx +++ b/docs-site/content/docs/cli-reference/ktx-ingest.mdx @@ -177,7 +177,9 @@ Slowest phase: reconciliation (2m 05s, 48% of wall time). 2 work units (1 failed Work units run serially by default (`ingest.workUnits.maxConcurrency` is `1`); raise it in `ktx.yaml` if the profile shows the run is bound by serialized -work-unit agent loops. +work-unit agent loops. If the provider reports an LLM rate limit, **ktx** shows +a transient wait message and temporarily reduces effective work-unit concurrency +according to `ingest.rateLimit`. ## Common errors diff --git a/docs-site/content/docs/configuration/ktx-yaml.mdx b/docs-site/content/docs/configuration/ktx-yaml.mdx index 17a04c53..831e678a 100644 --- a/docs-site/content/docs/configuration/ktx-yaml.mdx +++ b/docs-site/content/docs/configuration/ktx-yaml.mdx @@ -452,6 +452,16 @@ ingest: stepBudget: 40 maxConcurrency: 2 failureMode: continue + rateLimit: + enabled: true + throttleThreshold: 0.8 + minConcurrencyUnderPressure: 1 + maxWaitMs: 600000 + retry: + maxAttempts: 6 + baseDelayMs: 1000 + maxDelayMs: 60000 + jitter: true ``` ### Adapters @@ -498,6 +508,24 @@ handles failures. | `workUnits.maxConcurrency` | `int > 0` | `1` | How many work units run in parallel. | | `workUnits.failureMode` | `abort` \| `continue` | `continue` | `abort` stops the whole ingest run on the first failure; `continue` records it and keeps going. | +### Rate limits + +`rateLimit` controls provider-neutral pacing for LLM calls during ingest. When a +provider reports a subscription window, retry-after delay, or HTTP 429, +**ktx** pauses new work-unit model calls, shows a transient wait in the CLI, +and reduces work-unit concurrency while the provider is under pressure. + +| Field | Type | Default | Purpose | +|-------|------|---------|---------| +| `rateLimit.enabled` | `boolean` | `true` | Master switch for ingest LLM rate-limit pacing and visible waits. | +| `rateLimit.throttleThreshold` | `number between 0 and 1` | `0.8` | Fraction of a known provider window at which **ktx** starts reducing concurrency. | +| `rateLimit.minConcurrencyUnderPressure` | `int > 0` | `1` | Effective work-unit concurrency while a provider is under rate-limit pressure. | +| `rateLimit.maxWaitMs` | `int > 0` | unset | Caps how long a single provider-reset wait can last. This bounds each wait, not the whole run: after a capped wait elapses **ktx** retries and may pause again. Omit to wait until the provider's reset time. | +| `rateLimit.retry.maxAttempts` | `int > 0` | `6` | Maximum attempts for a single rate-limited LLM call before the failure surfaces (counts the first try). Also bounds how far opaque backoff grows for responses without a reset time or retry-after value. | +| `rateLimit.retry.baseDelayMs` | `int > 0` | `1000` | Initial opaque retry delay in milliseconds. | +| `rateLimit.retry.maxDelayMs` | `int > 0` | `60000` | Maximum opaque retry delay in milliseconds. | +| `rateLimit.retry.jitter` | `boolean` | `true` | Add jitter to opaque retry delays. | + ## `scan` `scan` configures how schema-level inputs become structured context: diff --git a/packages/cli/src/context/core/abort.ts b/packages/cli/src/context/core/abort.ts new file mode 100644 index 00000000..95467c52 --- /dev/null +++ b/packages/cli/src/context/core/abort.ts @@ -0,0 +1,39 @@ +/** @internal */ +export function createAbortError(message = 'Aborted'): DOMException { + return new DOMException(message, 'AbortError'); +} + +export function isAbortError(error: unknown): boolean { + if (error instanceof DOMException && error.name === 'AbortError') { + return true; + } + if (!error || typeof error !== 'object') { + return false; + } + const record = error as { name?: unknown; code?: unknown }; + return record.name === 'AbortError' || record.code === 'ABORT_ERR'; +} + +/** @internal */ +export function throwIfAborted(signal?: AbortSignal): void { + if (signal?.aborted) { + throw createAbortError(); + } +} + +export function linkAbortSignal(parent?: AbortSignal): { controller: AbortController; dispose: () => void } { + const controller = new AbortController(); + if (!parent) { + return { controller, dispose: () => undefined }; + } + if (parent.aborted) { + controller.abort(createAbortError()); + return { controller, dispose: () => undefined }; + } + const onAbort = () => controller.abort(createAbortError()); + parent.addEventListener('abort', onAbort, { once: true }); + return { + controller, + dispose: () => parent.removeEventListener('abort', onAbort), + }; +} diff --git a/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts b/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts index 348544ca..7848fab7 100644 --- a/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts +++ b/packages/cli/src/context/ingest/context-candidates/curator-pagination.service.ts @@ -40,6 +40,7 @@ export interface CuratorPaginationInput { buildToolSet: (passNumber: number) => KtxRuntimeToolSet; getReconciliationActions: () => MemoryAction[]; onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void; + abortSignal?: AbortSignal; } interface CuratorPaginationResult extends ReconciliationOutcome { @@ -243,6 +244,7 @@ export class CuratorPaginationService implements CuratorPaginationPort { sourceKey: params.input.sourceKey, jobId: params.input.jobId, forceRun: params.forceRun, + abortSignal: params.input.abortSignal, onStepFinish: params.input.onStepFinish ? ({ stepIndex, stepBudget }) => params.input.onStepFinish?.({ passNumber: params.passNumber, stepIndex, stepBudget }) diff --git a/packages/cli/src/context/ingest/final-gate-repair.ts b/packages/cli/src/context/ingest/final-gate-repair.ts index 1c373aa6..f32178d8 100644 --- a/packages/cli/src/context/ingest/final-gate-repair.ts +++ b/packages/cli/src/context/ingest/final-gate-repair.ts @@ -21,6 +21,7 @@ export interface RepairFinalGateFailureInput { repairKind: FinalGateRepairKind; maxAttempts?: number; stepBudget?: number; + abortSignal?: AbortSignal; } const readRepairFileSchema = z.object({ @@ -200,6 +201,7 @@ export async function repairFinalGateFailure( jobId: input.trace.context.jobId, repairKind: input.repairKind, }, + abortSignal: input.abortSignal, }), ); diff --git a/packages/cli/src/context/ingest/ingest-bundle.runner.ts b/packages/cli/src/context/ingest/ingest-bundle.runner.ts index 3f2b41d3..a242d58a 100644 --- a/packages/cli/src/context/ingest/ingest-bundle.runner.ts +++ b/packages/cli/src/context/ingest/ingest-bundle.runner.ts @@ -3,6 +3,7 @@ import { dirname, join } from 'node:path'; import pLimit from 'p-limit'; import { z } from 'zod'; import { type KtxLogger, noopLogger } from '../../context/core/config.js'; +import type { RateLimitWaitState } from '../../context/llm/rate-limit-governor.js'; import { createRuntimeToolDescriptorFromAiTool } from '../../context/llm/runtime-tools.js'; import type { KtxRuntimeToolSet } from '../../context/llm/runtime-port.js'; import type { CaptureSession, MemoryAction } from '../../context/memory/types.js'; @@ -219,6 +220,10 @@ export class IngestBundleRunner { } async run(job: IngestBundleJob, ctx?: IngestJobContext): Promise { + const unsubscribeRateLimitGovernor = this.subscribeRateLimitGovernor({ + trace: this.createTrace(job), + memoryFlow: ctx?.memoryFlow, + }); const key = job.connectionId; const previous = this.chainByConnection.get(key); if (previous) { @@ -241,10 +246,72 @@ export class IngestBundleRunner { ctx?.memoryFlow?.finish('error', [sanitizeMemoryFlowError(error)]); throw error; } finally { + unsubscribeRateLimitGovernor(); await this.maybeEmitIngestProfile(job.jobId); } } + private formatRateLimitWait( + state: Extract, + ): string { + const seconds = Math.ceil(state.remainingMs / 1_000); + const minutes = Math.floor(seconds / 60); + const remainder = seconds % 60; + const duration = minutes > 0 ? `${minutes}m${String(remainder).padStart(2, '0')}s` : `${seconds}s`; + const type = state.rateLimitType ? ` ${state.rateLimitType}` : ''; + return `Rate-limited (${state.provider}${type}); resuming in ${duration}; Ctrl+C to stop`; + } + + private subscribeRateLimitGovernor(input: { + trace: IngestTraceWriter; + memoryFlow?: MemoryFlowEventSink; + }): () => void { + const governor = this.deps.settings.rateLimitGovernor; + if (!governor) { + return () => undefined; + } + return governor.subscribe((state: RateLimitWaitState) => { + if (state.kind === 'rate_limit_observed') { + void input.trace.event('info', 'rate_limit', 'rate_limit_observed', { ...state }); + return; + } + if (state.kind === 'concurrency_adjusted') { + void input.trace.event('info', 'rate_limit', 'concurrency_adjusted', { ...state }); + return; + } + void input.trace.event('info', 'rate_limit', state.kind, { ...state }); + if (state.kind === 'wait_tick' || state.kind === 'wait_started') { + input.memoryFlow?.emit({ + type: 'rate_limit_wait', + provider: state.provider, + ...(state.rateLimitType ? { rateLimitType: state.rateLimitType } : {}), + resumeAtMs: state.resumeAtMs, + remainingMs: state.remainingMs, + }); + input.memoryFlow?.emit({ + type: 'stage_progress', + stage: 'integration', + percent: 50, + message: this.formatRateLimitWait(state), + transient: true, + }); + } + }); + } + + private async withRateLimitWorkSlot(abortSignal: AbortSignal | undefined, fn: () => Promise): Promise { + const governor = this.deps.settings.rateLimitGovernor; + if (!governor) { + return fn(); + } + const release = await governor.acquireWorkSlot(abortSignal); + try { + return await fn(); + } finally { + release(); + } + } + /** * When profiling is enabled — via the `KTX_PROFILE_INGEST` env var or the * `ingest.profile` config setting — read the job's trace + tool transcripts @@ -877,6 +944,7 @@ export class IngestBundleRunner { includeContextEvidenceTools: boolean; currentTableExists(tableRef: string): Promise; memoryFlow?: MemoryFlowEventSink; + abortSignal?: AbortSignal; wuSkillNames: string[]; onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; }): Promise { @@ -1029,6 +1097,7 @@ export class IngestBundleRunner { jobId: input.job.jobId, toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0, onStepFinish: input.onStepFinish, + abortSignal: input.abortSignal, }, input.wu, ); @@ -1524,7 +1593,8 @@ export class IngestBundleRunner { try { await Promise.all( workUnits.map((wu, index) => - limitWorkUnit(async () => { + limitWorkUnit(() => + this.withRateLimitWorkSlot(ctx?.abortSignal, async () => { const outcome = await runIsolatedWorkUnit({ unitIndex: index, ingestionBaseSha, @@ -1532,6 +1602,7 @@ export class IngestBundleRunner { patchDir, trace: runTrace, workUnit: wu, + abortSignal: ctx?.abortSignal, afterSuccess: (child) => copyTransientIngestEvidence(child.workdir, sessionWorktree.workdir), run: async (child) => { const scopedWikiService = this.deps.wikiService.forWorktree(child.workdir); @@ -1565,6 +1636,7 @@ export class IngestBundleRunner { includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport, currentTableExists: (tableRef) => this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef), + abortSignal: ctx?.abortSignal, memoryFlow, wuSkillNames, onStepFinish: ({ stepIndex, stepBudget }) => { @@ -1594,7 +1666,8 @@ export class IngestBundleRunner { completedWorkUnits / workUnits.length, `${completedWorkUnits} of ${workUnits.length} work units complete`, ); - }), + }), + ), ), ); } catch (error) { @@ -1693,6 +1766,7 @@ export class IngestBundleRunner { reason: context.reason, maxAttempts: 1, stepBudget: 12, + abortSignal: ctx?.abortSignal, }); emitStageProgress( 'integration', @@ -1714,6 +1788,7 @@ export class IngestBundleRunner { repairKind: 'patch_semantic_gate', maxAttempts: 1, stepBudget: 16, + abortSignal: ctx?.abortSignal, }); emitStageProgress( 'integration', @@ -1993,6 +2068,7 @@ export class IngestBundleRunner { ); } : undefined, + abortSignal: ctx?.abortSignal, }); curatorReport = curatorOutcome.report; curatorWarnings = curatorOutcome.warnings; @@ -2038,6 +2114,7 @@ export class IngestBundleRunner { sourceKey: job.sourceKey, jobId: job.jobId, force: !!overrideReport, + abortSignal: ctx?.abortSignal, onStepFinish: stage4 ? ({ stepIndex, stepBudget }) => { emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, { @@ -2470,6 +2547,7 @@ export class IngestBundleRunner { repairKind: 'final_artifact_gate', maxAttempts: 1, stepBudget: 16, + abortSignal: ctx?.abortSignal, }); isolatedDiffSummary.gateRepairAttempts += gateRepair.attempts; diff --git a/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts b/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts index 5ae551d1..c4a00448 100644 --- a/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts +++ b/packages/cli/src/context/ingest/isolated-diff/textual-conflict-resolver.ts @@ -19,6 +19,7 @@ export interface ResolveTextualConflictInput { reason: string; maxAttempts?: number; stepBudget?: number; + abortSignal?: AbortSignal; } const readIntegrationFileSchema = z.object({ @@ -208,6 +209,7 @@ export async function resolveTextualConflict( jobId: input.trace.context.jobId, unitKey: input.unitKey, }, + abortSignal: input.abortSignal, }), ); diff --git a/packages/cli/src/context/ingest/isolated-diff/work-unit-executor.ts b/packages/cli/src/context/ingest/isolated-diff/work-unit-executor.ts index 7475612e..5ab52102 100644 --- a/packages/cli/src/context/ingest/isolated-diff/work-unit-executor.ts +++ b/packages/cli/src/context/ingest/isolated-diff/work-unit-executor.ts @@ -14,6 +14,7 @@ export interface RunIsolatedWorkUnitInput { patchDir: string; trace: IngestTraceWriter; workUnit: WorkUnit; + abortSignal?: AbortSignal; run(child: IngestSessionWorktree): Promise; afterSuccess?(child: IngestSessionWorktree): Promise; } diff --git a/packages/cli/src/context/ingest/local-bundle-runtime.ts b/packages/cli/src/context/ingest/local-bundle-runtime.ts index 9d6aba95..e4c45b3f 100644 --- a/packages/cli/src/context/ingest/local-bundle-runtime.ts +++ b/packages/cli/src/context/ingest/local-bundle-runtime.ts @@ -12,6 +12,7 @@ import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic- import { createRuntimeToolDescriptorFromAiTool } from '../../context/llm/runtime-tools.js'; import { createLocalKtxLlmRuntimeFromConfig } from '../../context/llm/local-config.js'; import { KtxIngestEmbeddingPortAdapter } from '../../context/llm/embedding-port.js'; +import { createRateLimitGovernorConfig, RateLimitGovernor } from '../../context/llm/rate-limit-governor.js'; import { RuntimeAgentRunner, type AgentRunnerPort, type KtxLlmRuntimePort, type KtxRuntimeToolSet } from '../../context/llm/runtime-port.js'; import type { KtxEmbeddingProvider } from '../../llm/types.js'; import type { KtxLocalProject } from '../../context/project/project.js'; @@ -619,7 +620,7 @@ function localIngestLlmProviderGuardMessage(projectDir: string): string { ].join('\n'); } -function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): { +function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions, rateLimitGovernor: RateLimitGovernor): { agentRunner: AgentRunnerPort; llmRuntime?: KtxLlmRuntimePort; } { @@ -628,6 +629,7 @@ function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): { (options.createLlmRuntime ?? createLocalKtxLlmRuntimeFromConfig)(options.project.config.llm, { projectDir: options.project.projectDir, env: process.env, + rateLimitGovernor, }) ?? undefined; @@ -677,7 +679,13 @@ export function createLocalBundleIngestRuntime( const knowledgeIndex = new LocalKnowledgeIndex(options.project, embedding); const knowledgeEvents = new NoopKnowledgeEventPort(); const wikiService = new KnowledgeWikiService(rootFileStore, embedding, knowledgeIndex, options.project.git, logger); - const { agentRunner, llmRuntime } = resolveAgentRunner(options); + const rateLimitGovernor = new RateLimitGovernor( + createRateLimitGovernorConfig({ + ...options.project.config.ingest.rateLimit, + maxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, + }), + ); + const { agentRunner, llmRuntime } = resolveAgentRunner(options, rateLimitGovernor); const promptService = new PromptService({ promptsDir, partials: [], logger }); const storage = new LocalIngestStorage(options.project); const registry = registerAdapters(options.adapters); @@ -717,6 +725,7 @@ export function createLocalBundleIngestRuntime( workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency, workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget, workUnitFailureMode: options.project.config.ingest.workUnits.failureMode, + rateLimitGovernor, profileIngest: options.project.config.ingest.profile, ingestTraceLevel: ingestTraceLevelFromEnv(), }, diff --git a/packages/cli/src/context/ingest/local-ingest.ts b/packages/cli/src/context/ingest/local-ingest.ts index ec8a72f4..1a219629 100644 --- a/packages/cli/src/context/ingest/local-ingest.ts +++ b/packages/cli/src/context/ingest/local-ingest.ts @@ -3,6 +3,7 @@ import { cp, mkdir, rm } from 'node:fs/promises'; import { isAbsolute, resolve } from 'node:path'; import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-executor.js'; import type { KtxLogger } from '../../context/core/config.js'; +import { createAbortError, isAbortError } from '../../context/core/abort.js'; import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js'; import type { AgentRunnerPort, KtxLlmRuntimePort } from '../../context/llm/runtime-port.js'; import type { KtxLocalProject } from '../../context/project/project.js'; @@ -36,6 +37,7 @@ export interface RunLocalIngestOptions { queryExecutor?: KtxSqlQueryExecutorPort; logger?: KtxLogger; embeddingProvider?: import('../../llm/types.js').KtxEmbeddingProvider | null; + abortSignal?: AbortSignal; } export interface LocalIngestResult { @@ -123,10 +125,11 @@ function findAdapter(adapters: SourceAdapter[], source: string): SourceAdapter { return adapter; } -function localJobContext(jobId: string, memoryFlow?: MemoryFlowEventSink): IngestJobContext { +function localJobContext(jobId: string, memoryFlow?: MemoryFlowEventSink, abortSignal?: AbortSignal): IngestJobContext { return { jobId, ...(memoryFlow ? { memoryFlow } : {}), + ...(abortSignal ? { abortSignal } : {}), startPhase() { return new LocalIngestPhase(); }, @@ -158,6 +161,7 @@ async function runScheduledPullJob(options: { queryExecutor?: KtxSqlQueryExecutorPort; logger?: KtxLogger; embeddingProvider?: import('../../llm/types.js').KtxEmbeddingProvider | null; + abortSignal?: AbortSignal; }): Promise { const runtime = createLocalBundleIngestRuntime(options); const jobId = options.jobId ?? runtime.nextJobId(); @@ -169,7 +173,7 @@ async function runScheduledPullJob(options: { trigger: options.trigger ?? 'manual_resync', bundleRef: { kind: 'scheduled_pull', config: options.pullConfig }, }, - localJobContext(jobId, options.memoryFlow), + localJobContext(jobId, options.memoryFlow, options.abortSignal), ); const report = await runtime.store.findByJobId(jobId); if (!report) { @@ -212,6 +216,7 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise KtxRuntimeToolSet; getReconciliationActions: () => MemoryAction[]; onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void; + abortSignal?: AbortSignal; }): Promise; } diff --git a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts index ec514a02..a7387c8a 100644 --- a/packages/cli/src/context/ingest/stages/stage-3-work-units.ts +++ b/packages/cli/src/context/ingest/stages/stage-3-work-units.ts @@ -1,4 +1,5 @@ import type { KtxModelRole } from '../../../llm/types.js'; +import { isAbortError } from '../../core/abort.js'; import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js'; import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js'; import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js'; @@ -28,6 +29,7 @@ export interface WorkUnitExecutionDeps { connectionId: string; jobId: string; onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; + abortSignal?: AbortSignal; toolFailureCount?: (unitKey: string) => number; } @@ -106,8 +108,12 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit) jobId: deps.jobId, }, onStepFinish: deps.onStepFinish, + abortSignal: deps.abortSignal, }); } catch (error) { + if (isAbortError(error)) { + throw error; + } return failWithResetFromCurrentHead(error instanceof Error ? error.message : String(error)); } diff --git a/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts b/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts index 5abc9bfb..c78e1b48 100644 --- a/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts +++ b/packages/cli/src/context/ingest/stages/stage-4-reconciliation.ts @@ -16,6 +16,7 @@ export interface ReconciliationContext { jobId: string; force?: boolean; onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void; + abortSignal?: AbortSignal; forceRun?: boolean; } @@ -40,6 +41,7 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi stepBudget: ctx.stepBudget, telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId }, onStepFinish: ctx.onStepFinish, + abortSignal: ctx.abortSignal, }); return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) }; } diff --git a/packages/cli/src/context/ingest/types.ts b/packages/cli/src/context/ingest/types.ts index 337885af..925f3d82 100644 --- a/packages/cli/src/context/ingest/types.ts +++ b/packages/cli/src/context/ingest/types.ts @@ -220,5 +220,6 @@ export interface IngestJobPhase { export interface IngestJobContext { jobId: string; memoryFlow?: MemoryFlowEventSink; + abortSignal?: AbortSignal; startPhase(weight: number): IngestJobPhase; } diff --git a/packages/cli/src/context/llm/ai-sdk-runtime.ts b/packages/cli/src/context/llm/ai-sdk-runtime.ts index f5752355..d5a60c7b 100644 --- a/packages/cli/src/context/llm/ai-sdk-runtime.ts +++ b/packages/cli/src/context/llm/ai-sdk-runtime.ts @@ -3,7 +3,9 @@ import type { KtxLlmProvider } from '../../llm/types.js'; import { generateText, Output, stepCountIs, type FlexibleSchema, type TelemetrySettings, type ToolSet } from 'ai'; import type { z } from 'zod'; import { noopLogger, type KtxLogger } from '../../context/core/config.js'; +import { isAbortError } from '../core/abort.js'; import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from './debug-request-recorder.js'; +import type { RateLimitGovernor, RateLimitProvider, RateLimitSignal } from './rate-limit-governor.js'; import { createAiSdkToolSet } from './runtime-tools.js'; import type { KtxGenerateObjectInput, @@ -40,12 +42,129 @@ export interface AiSdkKtxLlmRuntimeDeps { telemetry?: AgentTelemetryPort; logger?: KtxLogger; debugRequestRecorder?: KtxLlmDebugRequestRecorder; + rateLimitGovernor?: Pick; } function hasTools(tools: Record): boolean { return Object.keys(tools).length > 0; } +function modelProviderName(model: unknown): RateLimitProvider { + const provider = (model as { provider?: string }).provider ?? ''; + return provider.includes('vertex') || provider.includes('google') ? 'vertex' : 'anthropic-api'; +} + +interface HeaderLimitPair { + limit: string; + remaining: string; + rateLimitType: string; +} + +const RATE_LIMIT_HEADER_PAIRS: HeaderLimitPair[] = [ + { + limit: 'anthropic-ratelimit-requests-limit', + remaining: 'anthropic-ratelimit-requests-remaining', + rateLimitType: 'rpm', + }, + { + limit: 'anthropic-ratelimit-tokens-limit', + remaining: 'anthropic-ratelimit-tokens-remaining', + rateLimitType: 'tpm', + }, + { + limit: 'anthropic-ratelimit-input-tokens-limit', + remaining: 'anthropic-ratelimit-input-tokens-remaining', + rateLimitType: 'itpm', + }, + { + limit: 'anthropic-ratelimit-output-tokens-limit', + remaining: 'anthropic-ratelimit-output-tokens-remaining', + rateLimitType: 'otpm', + }, + { + limit: 'x-ratelimit-limit-requests', + remaining: 'x-ratelimit-remaining-requests', + rateLimitType: 'rpm', + }, + { + limit: 'x-ratelimit-limit-tokens', + remaining: 'x-ratelimit-remaining-tokens', + rateLimitType: 'tpm', + }, +]; + +function normalizeHeaders(headers: unknown): Record { + if (!headers || typeof headers !== 'object') { + return {}; + } + const get = (headers as { get?: unknown }).get; + if (typeof get === 'function') { + const out: Record = {}; + for (const pair of RATE_LIMIT_HEADER_PAIRS) { + const limit = get.call(headers, pair.limit); + const remaining = get.call(headers, pair.remaining); + if (typeof limit === 'string') out[pair.limit] = limit; + if (typeof remaining === 'string') out[pair.remaining] = remaining; + } + return out; + } + return Object.fromEntries( + Object.entries(headers as Record) + .filter((entry): entry is [string, string | number] => typeof entry[1] === 'string' || typeof entry[1] === 'number') + .map(([key, value]) => [key.toLowerCase(), String(value)]), + ); +} + +function numericHeader(headers: Record, key: string): number | undefined { + const value = Number(headers[key]); + return Number.isFinite(value) && value >= 0 ? value : undefined; +} + +function utilizationForPair(headers: Record, pair: HeaderLimitPair): number | undefined { + const limit = numericHeader(headers, pair.limit); + const remaining = numericHeader(headers, pair.remaining); + if (limit === undefined || remaining === undefined || limit <= 0) { + return undefined; + } + return 1 - Math.min(limit, remaining) / limit; +} + +function aiSdkHeaderRateLimitSignal(provider: RateLimitProvider, result: unknown): RateLimitSignal | undefined { + const headers = normalizeHeaders((result as { response?: { headers?: unknown } }).response?.headers); + let best: { utilization: number; rateLimitType: string } | undefined; + for (const pair of RATE_LIMIT_HEADER_PAIRS) { + const utilization = utilizationForPair(headers, pair); + if (utilization === undefined) { + continue; + } + if (!best || utilization > best.utilization) { + best = { utilization, rateLimitType: pair.rateLimitType }; + } + } + if (!best) { + return undefined; + } + return { + provider, + status: 'allowed', + rateLimitType: best.rateLimitType, + utilization: Number(best.utilization.toFixed(4)), + }; +} + +function retryAfterMs(error: unknown): number | undefined { + const value = (error as { retryAfter?: unknown }).retryAfter; + if (typeof value === 'number' && Number.isFinite(value) && value > 0) { + return value < 1_000 ? value * 1_000 : value; + } + return undefined; +} + +function isAiSdkRateLimitError(error: unknown): boolean { + const record = error as { name?: string; statusCode?: number; status?: number }; + return record.name === 'TooManyRequestsError' || record.statusCode === 429 || record.status === 429; +} + export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { private readonly logger: KtxLogger; @@ -53,6 +172,41 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { this.logger = deps.logger ?? noopLogger; } + private async generateTextWithRateLimitRetry( + provider: RateLimitProvider, + abortSignal: AbortSignal | undefined, + run: () => Promise, + ): Promise { + // maxRetryAttempts() returns 1 when no governor is present or pacing is + // disabled, so a 429 throws immediately instead of hammering the provider + // with no backoff; the AI SDK's own maxRetries still handles transient 429s. + const maxAttempts = this.deps.rateLimitGovernor?.maxRetryAttempts() ?? 1; + let attempt = 0; + while (true) { + await this.deps.rateLimitGovernor?.waitForReady(abortSignal); + try { + const result = await run(); + const signal = aiSdkHeaderRateLimitSignal(provider, result); + if (signal) { + this.deps.rateLimitGovernor?.report(signal); + } + return result; + } catch (error) { + if (isAbortError(error) || !isAiSdkRateLimitError(error) || attempt >= maxAttempts - 1) { + throw error; + } + attempt += 1; + const retryAfter = retryAfterMs(error); + this.deps.rateLimitGovernor?.report({ + provider, + status: 'rejected', + rateLimitType: 'http_429', + ...(retryAfter !== undefined ? { retryAfterMs: retryAfter } : {}), + }); + } + } + } + async generateText(input: KtxGenerateTextInput): Promise { const model = this.deps.llmProvider.getModel(input.role); if ((model as { provider?: string }).provider === 'deterministic') { @@ -67,12 +221,13 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { }); const split = splitKtxSystemMessages(built.messages); const startedAt = Date.now(); - const result = await generateText({ + const request = { model, temperature: input.temperature ?? 0, ...(split.system ? { system: split.system } : {}), messages: split.messages, tools: built.tools as ToolSet, + ...(input.abortSignal ? { abortSignal: input.abortSignal } : {}), ...(hasTools(tools) ? { experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({ @@ -80,7 +235,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { }), } : {}), - }); + }; + const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), input.abortSignal, () => generateText(request)); input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) }); if (typeof result.text !== 'string') { throw new Error('KTX LLM text generation returned no text'); @@ -101,12 +257,13 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { }); const split = splitKtxSystemMessages(built.messages); const startedAt = Date.now(); - const result = await generateText({ + const request = { model, temperature: input.temperature ?? 0, ...(split.system ? { system: split.system } : {}), messages: split.messages, tools: built.tools as ToolSet, + ...(input.abortSignal ? { abortSignal: input.abortSignal } : {}), ...(hasTools(tools) ? { experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({ @@ -115,7 +272,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { } : {}), output: Output.object({ schema: input.schema as unknown as FlexibleSchema }), - }); + }; + const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), input.abortSignal, () => generateText(request)); input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) }); if (result.output == null) { throw new Error('KTX LLM object generation returned no output'); @@ -152,7 +310,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { }), ); - const result = await generateText({ + const request = { model, temperature: 0, stopWhen: stepCountIs(params.stepBudget), @@ -163,6 +321,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { ...(promptMessages.system ? { system: promptMessages.system } : {}), messages: promptMessages.messages, tools: built.tools as ToolSet, + ...(params.abortSignal ? { abortSignal: params.abortSignal } : {}), onStepFinish: async () => { stepIndex += 1; stepBoundariesMs.push(Date.now() - startedAt); @@ -179,7 +338,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { ); } }, - }); + }; + const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request)); return { stopReason: 'natural', metrics: { @@ -190,6 +350,9 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort { }, }; } catch (error) { + if (isAbortError(error)) { + throw error; + } const err = error instanceof Error ? error : new Error(String(error)); this.logger.warn(`[agent-runner] loop failed: ${err.message}`); return { diff --git a/packages/cli/src/context/llm/claude-code-runtime.ts b/packages/cli/src/context/llm/claude-code-runtime.ts index 0c1e6881..26bd0529 100644 --- a/packages/cli/src/context/llm/claude-code-runtime.ts +++ b/packages/cli/src/context/llm/claude-code-runtime.ts @@ -7,8 +7,10 @@ import { } from '@anthropic-ai/claude-agent-sdk'; import { z } from 'zod'; import { noopLogger, type KtxLogger } from '../../context/core/config.js'; +import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js'; import { createKtxClaudeCodeEnv } from './claude-code-env.js'; import { resolveClaudeCodeModel } from './claude-code-models.js'; +import type { RateLimitGovernor, RateLimitSignal } from './rate-limit-governor.js'; import { createClaudeSdkTools, mcpToolIds } from './runtime-tools.js'; import type { KtxGenerateObjectInput, @@ -21,7 +23,16 @@ import type { RunLoopStopReason, } from './runtime-port.js'; -type QueryFn = (params: Parameters[0]) => AsyncIterable; +type QueryResult = AsyncIterable & { + interrupt?: () => void | Promise; +}; + +type QueryFn = (params: Parameters[0]) => QueryResult; + +interface ClaudeQueryOutcome { + result: SDKResultMessage; + rejectedRateLimitSignal?: RateLimitSignal; +} function claudeTokenUsage(result: SDKResultMessage): LlmTokenUsage { const usage = (result as { usage?: { input_tokens?: number; output_tokens?: number } }).usage; @@ -43,6 +54,7 @@ export interface ClaudeCodeKtxLlmRuntimeDeps { query?: QueryFn; env?: NodeJS.ProcessEnv; logger?: KtxLogger; + rateLimitGovernor?: Pick; } const BUILTIN_TOOLS = [ @@ -157,6 +169,74 @@ function expectedMcpServerNames(tools: KtxRuntimeToolSet | undefined): Set 0 ? new Set([KTX_MCP_SERVER_NAME]) : new Set(); } +const CLAUDE_RATE_LIMIT_ERROR_MARKERS = /\b429\b|rate limit|too many requests|quota exceeded|overloaded|max_retries/i; + +function normalizeClaudeResetAtMs(value: unknown): number | undefined { + if (typeof value === 'number' && Number.isFinite(value) && value > 0) { + return Math.round(value < 10_000_000_000 ? value * 1_000 : value); + } + if (typeof value === 'string') { + const numeric = Number(value); + if (Number.isFinite(numeric) && numeric > 0) { + return normalizeClaudeResetAtMs(numeric); + } + const parsed = Date.parse(value); + return Number.isFinite(parsed) ? parsed : undefined; + } + return undefined; +} + +function isClaudeRateLimitResult(result: SDKResultMessage, rejectedSignal: RateLimitSignal | undefined): boolean { + const error = resultError(result); + if (!error) { + return false; + } + if (rejectedSignal?.status === 'rejected') { + return true; + } + const resultDetails = result as { + stop_reason?: unknown; + terminal_reason?: unknown; + errors?: unknown[]; + }; + const details = [ + error.message, + resultDetails.stop_reason, + resultDetails.terminal_reason, + ...(resultDetails.errors ?? []), + ] + .filter((value): value is string => typeof value === 'string' && value.length > 0) + .join('\n'); + return CLAUDE_RATE_LIMIT_ERROR_MARKERS.test(details); +} + +function claudeRateLimitSignal(message: SDKMessage): RateLimitSignal | null { + const record = message as unknown as Record; + if (record.type === 'rate_limit_event') { + const info = record.rate_limit_info as Record | undefined; + if (!info) return null; + const rawStatus = typeof info.status === 'string' ? info.status : 'allowed'; + const resetAtMs = normalizeClaudeResetAtMs(info.resetsAt); + return { + provider: 'claude-subscription', + status: rawStatus === 'rejected' ? 'rejected' : rawStatus === 'allowed_warning' ? 'warning' : 'allowed', + ...(resetAtMs !== undefined ? { resetAtMs } : {}), + ...(typeof info.rateLimitType === 'string' ? { rateLimitType: info.rateLimitType } : {}), + ...(typeof info.utilization === 'number' ? { utilization: info.utilization } : {}), + }; + } + if (record.subtype === 'api_retry' || record.type === 'api_retry') { + const retryDelayMs = typeof record.retry_delay_ms === 'number' ? record.retry_delay_ms : undefined; + return { + provider: 'claude-subscription', + status: 'warning', + ...(retryDelayMs !== undefined ? { retryAfterMs: retryDelayMs } : {}), + rateLimitType: 'api_retry', + }; + } + return null; +} + function managedMcpSettings(serverNames: string[]): NonNullable { return { allowManagedMcpServersOnly: true, @@ -217,21 +297,63 @@ async function collectResult(params: { allowedToolIds: Set; expectedMcpServerNames: Set; onAssistantTurn?: () => Promise; -}): Promise { + rateLimitGovernor?: Pick; + abortSignal?: AbortSignal; +}): Promise { let result: SDKResultMessage | undefined; - for await (const message of params.query({ prompt: params.prompt, options: params.options })) { - assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames); - if (countsAsAssistantTurn(message)) { - await params.onAssistantTurn?.(); - } - if (isResult(message)) { - result = message; + let rejectedRateLimitSignal: RateLimitSignal | undefined; + throwIfAborted(params.abortSignal); + await params.rateLimitGovernor?.waitForReady(params.abortSignal); + throwIfAborted(params.abortSignal); + const queryResult = params.query({ prompt: params.prompt, options: params.options }); + const onAbort = () => { + void Promise.resolve(queryResult.interrupt?.()).catch(() => undefined); + }; + params.abortSignal?.addEventListener('abort', onAbort, { once: true }); + try { + for await (const message of queryResult) { + throwIfAborted(params.abortSignal); + const rateLimitSignal = claudeRateLimitSignal(message); + if (rateLimitSignal) { + if (rateLimitSignal.status === 'rejected') { + rejectedRateLimitSignal = rateLimitSignal; + } + params.rateLimitGovernor?.report(rateLimitSignal); + } + assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames); + if (countsAsAssistantTurn(message)) { + await params.onAssistantTurn?.(); + } + if (isResult(message)) { + result = message; + } } + } finally { + params.abortSignal?.removeEventListener('abort', onAbort); + } + if (params.abortSignal?.aborted) { + throw createAbortError(); } if (!result) { throw new Error('Claude Code query returned no result message'); } - return result; + return { + result, + ...(rejectedRateLimitSignal ? { rejectedRateLimitSignal } : {}), + }; +} + +async function collectResultWithRateLimitRetry(params: Parameters[0]): Promise { + // maxRetryAttempts() returns 1 when no governor is present or pacing is + // disabled, so a rate-limited result surfaces without an extra query; the + // Claude Code SDK applies its own backoff for transient rejections. + const maxAttempts = params.rateLimitGovernor?.maxRetryAttempts() ?? 1; + for (let attempt = 0; ; attempt += 1) { + const outcome = await collectResult(params); + if (!isClaudeRateLimitResult(outcome.result, outcome.rejectedRateLimitSignal) || attempt >= maxAttempts - 1) { + return outcome.result; + } + } } export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { @@ -252,12 +374,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { tools: input.tools, }); const startedAt = Date.now(); - const result = await collectResult({ + const result = await collectResultWithRateLimitRetry({ query: this.runQuery, prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'), options, allowedToolIds: new Set(mcpToolIds(input.tools ?? {})), expectedMcpServerNames: expectedMcpServerNames(input.tools), + rateLimitGovernor: this.deps.rateLimitGovernor, + abortSignal: input.abortSignal, }); input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) }); const error = resultError(result); @@ -289,12 +413,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) }, }; const startedAt = Date.now(); - const result = await collectResult({ + const result = await collectResultWithRateLimitRetry({ query: this.runQuery, prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'), options, allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]), expectedMcpServerNames: expectedMcpServerNames(input.tools), + rateLimitGovernor: this.deps.rateLimitGovernor, + abortSignal: input.abortSignal, }); input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) }); const error = resultError(result); @@ -319,12 +445,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { maxTurns: params.stepBudget, tools: params.toolSet, }); - const result = await collectResult({ + const result = await collectResultWithRateLimitRetry({ query: this.runQuery, prompt: params.userPrompt, options: { ...options, systemPrompt: params.systemPrompt }, allowedToolIds: new Set(mcpToolIds(params.toolSet)), expectedMcpServerNames: expectedMcpServerNames(params.toolSet), + rateLimitGovernor: this.deps.rateLimitGovernor, + abortSignal: params.abortSignal, onAssistantTurn: async () => { stepIndex += 1; stepBoundariesMs.push(Date.now() - startedAt); @@ -355,6 +483,9 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort { }, }; } catch (error) { + if (isAbortError(error)) { + throw error; + } const err = error instanceof Error ? error : new Error(String(error)); return { stopReason: 'error', @@ -388,7 +519,7 @@ export async function runClaudeCodeAuthProbe(input: { env: input.env, maxTurns: 1, }); - const result = await collectResult({ + const result = await collectResultWithRateLimitRetry({ query: input.query ?? defaultQuery, prompt: 'Reply with exactly: ok', options, diff --git a/packages/cli/src/context/llm/codex-runtime.ts b/packages/cli/src/context/llm/codex-runtime.ts index 3535072b..2958b3f8 100644 --- a/packages/cli/src/context/llm/codex-runtime.ts +++ b/packages/cli/src/context/llm/codex-runtime.ts @@ -1,5 +1,6 @@ import { z } from 'zod'; import { noopLogger, type KtxLogger } from '../core/config.js'; +import { isAbortError, linkAbortSignal } from '../core/abort.js'; import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js'; import { startCodexRuntimeMcpServer, @@ -8,6 +9,7 @@ import { import { resolveCodexModel } from './codex-models.js'; import { buildCodexRuntimeConfig } from './codex-runtime-config.js'; import { CodexSdkCliRunner, type CodexSdkRunner } from './codex-sdk-runner.js'; +import type { RateLimitGovernor } from './rate-limit-governor.js'; import type { KtxGenerateObjectInput, KtxGenerateTextInput, @@ -24,6 +26,7 @@ export interface CodexKtxLlmRuntimeDeps { runner?: CodexSdkRunner; startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise; logger?: KtxLogger; + rateLimitGovernor?: Pick; } function modelForRole(modelSlots: CodexKtxLlmRuntimeDeps['modelSlots'], role: string): string { @@ -159,6 +162,12 @@ function runtimeToolNames(toolSet: KtxRuntimeToolSet | undefined): string[] { return Object.values(toolSet ?? {}).map((descriptor) => descriptor.name); } +const CODEX_RATE_LIMIT_MARKERS = /\b429\b|rate limit|too many requests|quota exceeded|temporarily overloaded/i; + +function isCodexRateLimitError(error: Error | undefined): boolean { + return !!error && CODEX_RATE_LIMIT_MARKERS.test(error.message); +} + export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { private readonly runner: CodexSdkRunner; private readonly logger: KtxLogger; @@ -168,6 +177,37 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { this.logger = deps.logger ?? noopLogger; } + private async runWithRateLimitRetry( + abortSignal: AbortSignal | undefined, + run: () => Promise, + getError: (result: T) => Error | undefined, + ): Promise { + // maxRetryAttempts() returns 1 when no governor is present or pacing is + // disabled, so an opaque rate-limit failure surfaces on the first attempt + // instead of being retried with no backoff. + const maxAttempts = this.deps.rateLimitGovernor?.maxRetryAttempts() ?? 1; + for (let attempt = 0; ; attempt += 1) { + await this.deps.rateLimitGovernor?.waitForReady(abortSignal); + const lastAttempt = attempt >= maxAttempts - 1; + try { + const result = await run(); + const error = getError(result); + if (!isCodexRateLimitError(error) || lastAttempt) { + return result; + } + } catch (error) { + if (isAbortError(error)) { + throw error; + } + const err = error instanceof Error ? error : new Error(String(error)); + if (!isCodexRateLimitError(err) || lastAttempt) { + throw error; + } + } + this.deps.rateLimitGovernor?.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' }); + } + } + async generateText(input: KtxGenerateTextInput): Promise { const startedAt = Date.now(); const model = modelForRole(this.deps.modelSlots, input.role); @@ -190,18 +230,26 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { } : {}), }); - const collected = await collectEvents( - await this.runner.runStreamed({ - projectDir: this.deps.projectDir, - model, - prompt: promptWithSystem(input.system, input.prompt), - configOverrides: config.configOverrides, - env: config.env, - }), + const result = await this.runWithRateLimitRetry( + input.abortSignal, + async () => { + const collected = await collectEvents( + await this.runner.runStreamed({ + projectDir: this.deps.projectDir, + model, + prompt: promptWithSystem(input.system, input.prompt), + configOverrides: config.configOverrides, + env: config.env, + ...(input.abortSignal ? { signal: input.abortSignal } : {}), + }), + ); + const summary = summarizeCodexExecEvents(collected.events, { startedAt }); + return { collected, summary }; + }, + ({ collected, summary }) => summaryError(summary, collected.streamError), ); - const summary = summarizeCodexExecEvents(collected.events, { startedAt }); - input.onMetrics?.(metrics(summary, startedAt)); - return assertSuccessfulText(summary, collected.streamError); + input.onMetrics?.(metrics(result.summary, startedAt)); + return assertSuccessfulText(result.summary, result.collected.streamError); } finally { await mcp?.close(); } @@ -231,19 +279,27 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { } : {}), }); - const collected = await collectEvents( - await this.runner.runStreamed({ - projectDir: this.deps.projectDir, - model, - prompt: promptWithSystem(input.system, input.prompt), - configOverrides: config.configOverrides, - env: config.env, - outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record, - }), + const result = await this.runWithRateLimitRetry( + input.abortSignal, + async () => { + const collected = await collectEvents( + await this.runner.runStreamed({ + projectDir: this.deps.projectDir, + model, + prompt: promptWithSystem(input.system, input.prompt), + configOverrides: config.configOverrides, + env: config.env, + outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record, + ...(input.abortSignal ? { signal: input.abortSignal } : {}), + }), + ); + const summary = summarizeCodexExecEvents(collected.events, { startedAt }); + return { collected, summary }; + }, + ({ collected, summary }) => summaryError(summary, collected.streamError), ); - const summary = summarizeCodexExecEvents(collected.events, { startedAt }); - input.onMetrics?.(metrics(summary, startedAt)); - return parseStructuredOutput(input.schema, assertSuccessfulText(summary, collected.streamError)); + input.onMetrics?.(metrics(result.summary, startedAt)); + return parseStructuredOutput(input.schema, assertSuccessfulText(result.summary, result.collected.streamError)); } finally { await mcp?.close(); } @@ -272,7 +328,6 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { } : {}), }); - const abortController = new AbortController(); const onStep = async (stepIndex: number): Promise => { try { await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget }); @@ -282,31 +337,50 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { ); } }; - const collected = await collectEvents( - await this.runner.runStreamed({ - projectDir: this.deps.projectDir, - model, - prompt: promptWithSystem(params.systemPrompt, params.userPrompt), - configOverrides: config.configOverrides, - env: config.env, - signal: abortController.signal, - }), - { stepBudget: params.stepBudget, abortController, onStep }, + const result = await this.runWithRateLimitRetry( + params.abortSignal, + async () => { + const linked = linkAbortSignal(params.abortSignal); + const abortController = linked.controller; + try { + const collected = await collectEvents( + await this.runner.runStreamed({ + projectDir: this.deps.projectDir, + model, + prompt: promptWithSystem(params.systemPrompt, params.userPrompt), + configOverrides: config.configOverrides, + env: config.env, + signal: abortController.signal, + }), + { stepBudget: params.stepBudget, abortController, onStep }, + ); + const summary = summarizeCodexExecEvents(collected.events, { startedAt }); + return { collected, summary }; + } finally { + linked.dispose(); + } + }, + ({ collected, summary }) => summaryError(summary, collected.streamError), ); - const summary = summarizeCodexExecEvents(collected.events, { startedAt }); - const error = summaryError(summary, collected.streamError); - const stopReason = collected.budgetExceeded ? 'budget' : error ? 'error' : summary.stopReason; + const error = summaryError(result.summary, result.collected.streamError); + if (isAbortError(error)) { + throw error; + } + const stopReason = result.collected.budgetExceeded ? 'budget' : error ? 'error' : result.summary.stopReason; return { stopReason, ...(stopReason === 'error' && error ? { error } : {}), metrics: { totalMs: Date.now() - startedAt, - usage: summary.usage, - stepCount: summary.stepCount, - stepBoundariesMs: summary.stepBoundariesMs, + usage: result.summary.usage, + stepCount: result.summary.stepCount, + stepBoundariesMs: result.summary.stepBoundariesMs, }, }; } catch (error) { + if (isAbortError(error)) { + throw error; + } const err = error instanceof Error ? error : new Error(String(error)); return { stopReason: 'error', diff --git a/packages/cli/src/context/llm/local-config.ts b/packages/cli/src/context/llm/local-config.ts index 58bd29a5..4c2502d1 100644 --- a/packages/cli/src/context/llm/local-config.ts +++ b/packages/cli/src/context/llm/local-config.ts @@ -6,16 +6,28 @@ import type { KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from '../project/ import { AiSdkKtxLlmRuntime } from './ai-sdk-runtime.js'; import { ClaudeCodeKtxLlmRuntime } from './claude-code-runtime.js'; import { CodexKtxLlmRuntime } from './codex-runtime.js'; +import type { RateLimitGovernor } from './rate-limit-governor.js'; import type { KtxLlmRuntimePort } from './runtime-port.js'; +type ClaudeCodeRuntimeDeps = ConstructorParameters[0] & { + rateLimitGovernor?: RateLimitGovernor; +}; +type CodexRuntimeDeps = ConstructorParameters[0] & { + rateLimitGovernor?: RateLimitGovernor; +}; +type AiSdkRuntimeDeps = ConstructorParameters[0] & { + rateLimitGovernor?: RateLimitGovernor; +}; + interface LocalConfigDeps { env?: NodeJS.ProcessEnv; projectDir?: string; + rateLimitGovernor?: RateLimitGovernor; createKtxLlmProvider?: typeof createKtxLlmProvider; createKtxEmbeddingProvider?: typeof createKtxEmbeddingProvider; - createClaudeCodeRuntime?: (deps: ConstructorParameters[0]) => KtxLlmRuntimePort; - createCodexRuntime?: (deps: ConstructorParameters[0]) => KtxLlmRuntimePort; - createAiSdkRuntime?: (deps: { llmProvider: KtxLlmProvider }) => KtxLlmRuntimePort; + createClaudeCodeRuntime?: (deps: ClaudeCodeRuntimeDeps) => KtxLlmRuntimePort; + createCodexRuntime?: (deps: CodexRuntimeDeps) => KtxLlmRuntimePort; + createAiSdkRuntime?: (deps: AiSdkRuntimeDeps) => KtxLlmRuntimePort; } function resolveOptional(value: string | undefined, env: NodeJS.ProcessEnv): string | undefined { @@ -129,6 +141,7 @@ export function createLocalKtxLlmRuntimeFromConfig( projectDir, modelSlots: resolved.modelSlots, env: deps.env, + rateLimitGovernor: deps.rateLimitGovernor, }); } if (resolved.backend === 'codex') { @@ -139,10 +152,14 @@ export function createLocalKtxLlmRuntimeFromConfig( return (deps.createCodexRuntime ?? ((runtimeDeps) => new CodexKtxLlmRuntime(runtimeDeps)))({ projectDir, modelSlots: resolved.modelSlots, + rateLimitGovernor: deps.rateLimitGovernor, }); } const llmProvider = (deps.createKtxLlmProvider ?? createKtxLlmProvider)(resolved); - return (deps.createAiSdkRuntime ?? ((runtimeDeps) => new AiSdkKtxLlmRuntime(runtimeDeps)))({ llmProvider }); + return (deps.createAiSdkRuntime ?? ((runtimeDeps) => new AiSdkKtxLlmRuntime(runtimeDeps)))({ + llmProvider, + rateLimitGovernor: deps.rateLimitGovernor, + }); } export function resolveLocalKtxEmbeddingConfig( diff --git a/packages/cli/src/context/llm/rate-limit-governor.ts b/packages/cli/src/context/llm/rate-limit-governor.ts new file mode 100644 index 00000000..909e4c44 --- /dev/null +++ b/packages/cli/src/context/llm/rate-limit-governor.ts @@ -0,0 +1,387 @@ +import { createAbortError, throwIfAborted } from '../core/abort.js'; + +export type RateLimitProvider = 'claude-subscription' | 'anthropic-api' | 'vertex' | 'codex'; +type RateLimitSignalStatus = 'allowed' | 'warning' | 'rejected'; + +export interface RateLimitSignal { + provider: RateLimitProvider; + status: RateLimitSignalStatus; + resetAtMs?: number; + retryAfterMs?: number; + utilization?: number; + rateLimitType?: string; +} + +export interface RateLimitRetryConfig { + maxAttempts: number; + baseDelayMs: number; + maxDelayMs: number; + jitter: boolean; +} + +export interface RateLimitGovernorConfig { + enabled: boolean; + maxConcurrency: number; + throttleThreshold: number; + minConcurrencyUnderPressure: number; + maxWaitMs?: number; + waitStateTickMs: number; + retry: RateLimitRetryConfig; +} + +export type RateLimitWaitState = + | { + kind: 'rate_limit_observed'; + provider: RateLimitProvider; + status: RateLimitSignalStatus; + rateLimitType?: string; + resetAtMs?: number; + retryAfterMs?: number; + utilization?: number; + } + | { + kind: 'concurrency_adjusted'; + provider: RateLimitProvider; + from: number; + to: number; + reason: string; + rateLimitType?: string; + utilization?: number; + } + | { + kind: 'wait_started' | 'wait_tick' | 'wait_finished'; + provider: RateLimitProvider; + rateLimitType?: string; + resumeAtMs: number; + remainingMs: number; + }; + +export interface RateLimitGovernorDeps { + now?: () => number; + sleep?: (ms: number, signal?: AbortSignal) => Promise; + random?: () => number; +} + +export type RateLimitRelease = () => void; +type Subscriber = (state: RateLimitWaitState) => void; + +const defaultSleep = (ms: number, signal?: AbortSignal): Promise => + new Promise((resolve, reject) => { + if (signal?.aborted) { + reject(createAbortError()); + return; + } + const timeout = setTimeout(resolve, ms); + signal?.addEventListener( + 'abort', + () => { + clearTimeout(timeout); + reject(createAbortError()); + }, + { once: true }, + ); + }); + +export function createRateLimitGovernorConfig( + input: Partial & { retry?: Partial } = {}, +): RateLimitGovernorConfig { + return { + enabled: input.enabled ?? true, + maxConcurrency: input.maxConcurrency ?? 1, + throttleThreshold: input.throttleThreshold ?? 0.8, + minConcurrencyUnderPressure: input.minConcurrencyUnderPressure ?? 1, + ...(input.maxWaitMs !== undefined ? { maxWaitMs: input.maxWaitMs } : {}), + waitStateTickMs: input.waitStateTickMs ?? 1_000, + retry: { + maxAttempts: input.retry?.maxAttempts ?? 6, + baseDelayMs: input.retry?.baseDelayMs ?? 1_000, + maxDelayMs: input.retry?.maxDelayMs ?? 60_000, + jitter: input.retry?.jitter ?? true, + }, + }; +} + +export class RateLimitGovernor { + private readonly now: () => number; + private readonly sleep: (ms: number, signal?: AbortSignal) => Promise; + private readonly random: () => number; + private readonly subscribers = new Set(); + private waiters: Array<() => void> = []; + private active = 0; + private effectiveLimit: number; + private pausedUntilMs: number | null = null; + private pausedProvider: RateLimitProvider | null = null; + private pausedRateLimitType: string | undefined; + private pausedTickMs: number | null = null; + private opaqueAttempts = new Map(); + private pauseGeneration = 0; + private visibleWaitAbort: AbortController | null = null; + + constructor( + private readonly config: RateLimitGovernorConfig, + deps: RateLimitGovernorDeps = {}, + ) { + this.now = deps.now ?? Date.now; + this.sleep = deps.sleep ?? defaultSleep; + this.random = deps.random ?? Math.random; + this.effectiveLimit = Math.max(1, config.maxConcurrency); + } + + currentLimit(): number { + return this.config.enabled ? this.effectiveLimit : this.config.maxConcurrency; + } + + /** + * Total attempts a runtime should make for a single rate-limited LLM call, + * including the first try. Returns 1 (no outer retry) when pacing is disabled: + * the outer retry loop only exists to cooperate with this governor's pause, so + * without active pacing there is no backoff to apply and the backend's own + * retry handles transient rejections. + */ + maxRetryAttempts(): number { + return this.config.enabled ? Math.max(1, this.config.retry.maxAttempts) : 1; + } + + activeSlots(): number { + return this.active; + } + + subscribe(cb: Subscriber): () => void { + this.subscribers.add(cb); + if (this.pausedUntilMs !== null) { + this.startVisibleWaitTicker(); + } + return () => { + this.subscribers.delete(cb); + if (this.subscribers.size === 0) { + this.stopVisibleWaitTicker(); + this.wakeWaiters(); + } + }; + } + + report(signal: RateLimitSignal): void { + if (!this.config.enabled) { + return; + } + this.emit({ + kind: 'rate_limit_observed', + provider: signal.provider, + status: signal.status, + ...(signal.rateLimitType ? { rateLimitType: signal.rateLimitType } : {}), + ...(signal.resetAtMs !== undefined ? { resetAtMs: signal.resetAtMs } : {}), + ...(signal.retryAfterMs !== undefined ? { retryAfterMs: signal.retryAfterMs } : {}), + ...(signal.utilization !== undefined ? { utilization: signal.utilization } : {}), + }); + + if (signal.status === 'rejected') { + this.applyPause(signal); + return; + } + + if (signal.status === 'warning' || (signal.utilization ?? 0) >= this.config.throttleThreshold) { + this.adjustLimit(Math.max(1, this.config.minConcurrencyUnderPressure), signal, 'provider pressure'); + return; + } + + this.opaqueAttempts.delete(signal.provider); + if ((signal.utilization ?? 0) < this.config.throttleThreshold) { + this.adjustLimit(Math.max(1, this.config.maxConcurrency), signal, 'provider recovered'); + } + } + + async waitForReady(signal?: AbortSignal): Promise { + throwIfAborted(signal); + if (!this.config.enabled) { + return; + } + await this.waitForPause(signal); + throwIfAborted(signal); + } + + async acquireWorkSlot(signal?: AbortSignal): Promise { + throwIfAborted(signal); + if (!this.config.enabled) { + this.active += 1; + return () => { + this.active -= 1; + }; + } + + while (true) { + throwIfAborted(signal); + await this.waitForPause(signal); + throwIfAborted(signal); + if (this.active < this.effectiveLimit) { + this.active += 1; + let released = false; + return () => { + if (released) return; + released = true; + this.active -= 1; + this.wakeWaiters(); + }; + } + await this.waitForSlot(signal); + } + } + + private applyPause(signal: RateLimitSignal): void { + const resumeAtMs = this.resumeAtMsFor(signal); + const boundedResumeAtMs = + this.config.maxWaitMs === undefined ? resumeAtMs : Math.min(resumeAtMs, this.now() + this.config.maxWaitMs); + if (this.pausedUntilMs === null || boundedResumeAtMs > this.pausedUntilMs) { + this.pausedUntilMs = boundedResumeAtMs; + this.pausedProvider = signal.provider; + this.pausedRateLimitType = signal.rateLimitType; + this.pausedTickMs = signal.rateLimitType === 'opaque' ? Math.max(1, boundedResumeAtMs - this.now()) : null; + this.emitWait('wait_started'); + this.startVisibleWaitTicker(); + this.wakeWaiters(); + } + this.adjustLimit(Math.max(1, this.config.minConcurrencyUnderPressure), signal, 'provider rejected'); + } + + private resumeAtMsFor(signal: RateLimitSignal): number { + if (signal.resetAtMs !== undefined) { + return signal.resetAtMs; + } + if (signal.retryAfterMs !== undefined) { + return this.now() + signal.retryAfterMs; + } + const attempts = this.opaqueAttempts.get(signal.provider) ?? 0; + this.opaqueAttempts.set(signal.provider, Math.min(attempts + 1, this.config.retry.maxAttempts)); + const base = Math.min( + this.config.retry.maxDelayMs, + this.config.retry.baseDelayMs * 2 ** Math.min(attempts, this.config.retry.maxAttempts - 1), + ); + const jitterMultiplier = this.config.retry.jitter ? 0.75 + this.random() * 0.5 : 1; + return this.now() + Math.round(base * jitterMultiplier); + } + + private adjustLimit(to: number, signal: RateLimitSignal, reason: string): void { + const bounded = Math.max(1, Math.min(this.config.maxConcurrency, to)); + if (bounded === this.effectiveLimit) { + return; + } + const from = this.effectiveLimit; + this.effectiveLimit = bounded; + this.emit({ + kind: 'concurrency_adjusted', + provider: signal.provider, + from, + to: bounded, + reason, + ...(signal.rateLimitType ? { rateLimitType: signal.rateLimitType } : {}), + ...(signal.utilization !== undefined ? { utilization: signal.utilization } : {}), + }); + this.wakeWaiters(); + } + + private startVisibleWaitTicker(): void { + if (this.subscribers.size === 0 || this.pausedUntilMs === null) { + return; + } + this.stopVisibleWaitTicker(); + const generation = (this.pauseGeneration += 1); + const controller = new AbortController(); + this.visibleWaitAbort = controller; + void this.runVisibleWaitTicker(generation, controller.signal).catch(() => undefined); + } + + private stopVisibleWaitTicker(): void { + this.visibleWaitAbort?.abort(); + this.visibleWaitAbort = null; + } + + private async runVisibleWaitTicker(generation: number, signal: AbortSignal): Promise { + while (!signal.aborted && generation === this.pauseGeneration && this.pausedUntilMs !== null) { + const remainingMs = this.pausedUntilMs - this.now(); + if (remainingMs <= 0) { + this.finishPause(generation); + return; + } + this.emitWait('wait_tick'); + await this.sleep(Math.min(this.pausedTickMs ?? this.config.waitStateTickMs, remainingMs), signal); + } + } + + private finishPause(generation?: number): void { + if (generation !== undefined && generation !== this.pauseGeneration) { + return; + } + this.emitWait('wait_finished'); + this.pausedUntilMs = null; + this.pausedProvider = null; + this.pausedRateLimitType = undefined; + this.pausedTickMs = null; + this.stopVisibleWaitTicker(); + this.wakeWaiters(); + } + + private async waitForPause(signal?: AbortSignal): Promise { + throwIfAborted(signal); + while (this.pausedUntilMs !== null) { + const remainingMs = this.pausedUntilMs - this.now(); + if (remainingMs <= 0) { + this.finishPause(); + return; + } + if (this.visibleWaitAbort !== null) { + await this.waitForSlot(signal); + } else { + await this.sleep(Math.min(this.pausedTickMs ?? this.config.waitStateTickMs, remainingMs), signal); + } + throwIfAborted(signal); + } + } + + private waitForSlot(signal?: AbortSignal): Promise { + if (signal?.aborted) { + return Promise.reject(createAbortError()); + } + return new Promise((resolve, reject) => { + const wake = () => { + cleanup(); + resolve(); + }; + const onAbort = () => { + cleanup(); + reject(createAbortError()); + }; + const cleanup = () => { + this.waiters = this.waiters.filter((candidate) => candidate !== wake); + signal?.removeEventListener('abort', onAbort); + }; + this.waiters.push(wake); + signal?.addEventListener('abort', onAbort, { once: true }); + }); + } + + private wakeWaiters(): void { + const waiters = this.waiters; + this.waiters = []; + for (const waiter of waiters) { + waiter(); + } + } + + private emitWait(kind: Extract): void { + if (this.pausedUntilMs === null || this.pausedProvider === null) { + return; + } + this.emit({ + kind, + provider: this.pausedProvider, + ...(this.pausedRateLimitType ? { rateLimitType: this.pausedRateLimitType } : {}), + resumeAtMs: this.pausedUntilMs, + remainingMs: Math.max(0, this.pausedUntilMs - this.now()), + }); + } + + private emit(state: RateLimitWaitState): void { + for (const subscriber of this.subscribers) { + subscriber(state); + } + } +} diff --git a/packages/cli/src/context/llm/runtime-port.ts b/packages/cli/src/context/llm/runtime-port.ts index db648448..9fec6208 100644 --- a/packages/cli/src/context/llm/runtime-port.ts +++ b/packages/cli/src/context/llm/runtime-port.ts @@ -49,6 +49,7 @@ export interface RunLoopParams { stepBudget: number; telemetryTags: Record; onStepFinish?: (info: RunLoopStepInfo) => void | Promise; + abortSignal?: AbortSignal; } export interface RunLoopResult { @@ -64,6 +65,7 @@ export interface KtxGenerateTextInput { tools?: KtxRuntimeToolSet; temperature?: number; onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void; + abortSignal?: AbortSignal; } export interface KtxGenerateObjectInput> { @@ -74,6 +76,7 @@ export interface KtxGenerateObjectInput void; + abortSignal?: AbortSignal; } export interface KtxLlmRuntimePort { diff --git a/packages/cli/src/context/project/config.ts b/packages/cli/src/context/project/config.ts index cbea79b6..fd7f482c 100644 --- a/packages/cli/src/context/project/config.ts +++ b/packages/cli/src/context/project/config.ts @@ -100,6 +100,44 @@ const workUnitsSchema = z }) .describe('Concurrency and failure handling for ingest work units.'); +const ingestRateLimitRetrySchema = z + .strictObject({ + maxAttempts: z + .int() + .positive() + .default(6) + .describe( + 'Maximum attempts for a single rate-limited LLM call before the failure surfaces, counting the first try. Also bounds how far opaque backoff grows for providers that do not expose a reset time.', + ), + baseDelayMs: z.int().positive().default(1_000).describe('Initial opaque retry delay in milliseconds.'), + maxDelayMs: z.int().positive().default(60_000).describe('Maximum opaque retry delay in milliseconds.'), + jitter: z.boolean().default(true).describe('When true, apply bounded jitter to opaque retry delays.'), + }) + .describe('Retry policy for rate-limit responses that do not include a reset time or retry-after value.'); + +const ingestRateLimitSchema = z + .strictObject({ + enabled: z.boolean().default(true).describe('Master switch for ingest LLM rate-limit pacing and visible waits.'), + throttleThreshold: z + .number() + .min(0) + .max(1) + .default(0.8) + .describe('Provider utilization at or above which ingest throttles new work-unit starts.'), + minConcurrencyUnderPressure: z + .int() + .positive() + .default(1) + .describe('Effective work-unit concurrency while a provider is under rate-limit pressure.'), + maxWaitMs: z + .int() + .positive() + .optional() + .describe('Optional cap on a single provider reset wait. Omit to wait indefinitely until the provider reset time.'), + retry: ingestRateLimitRetrySchema.prefault({}).describe('Opaque retry policy for providers without reset hints.'), + }) + .describe('Rate-limit pacing and wait policy for ingest LLM calls.'); + const ingestSchema = z .strictObject({ adapters: z @@ -110,6 +148,7 @@ const ingestSchema = z .prefault({ backend: 'none' }) .describe('Embedding configuration used when ingest adapters need to embed documents.'), workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'), + rateLimit: ingestRateLimitSchema.prefault({}).describe('LLM rate-limit pacing and visible-wait policy for ingest.'), profile: z .union([z.boolean(), z.literal('json')]) .default(false) diff --git a/packages/cli/src/ingest.ts b/packages/cli/src/ingest.ts index ad5ba270..319c3d1b 100644 --- a/packages/cli/src/ingest.ts +++ b/packages/cli/src/ingest.ts @@ -78,6 +78,7 @@ export interface KtxIngestDeps { readReportFile?: typeof readIngestReportSnapshotFile; renderStoredMemoryFlow?: typeof renderMemoryFlowTui; startLiveMemoryFlow?: typeof startLiveMemoryFlowTui; + abortSignal?: AbortSignal; env?: NodeJS.ProcessEnv; localIngestOptions?: Pick< RunLocalIngestOptions, @@ -93,6 +94,23 @@ export interface KtxIngestDeps { runtimeIo?: KtxIngestIo; } +function createCliAbortSignal(): { signal: AbortSignal; dispose: () => void } { + const controller = new AbortController(); + let interrupted = false; + const onSigint = () => { + if (interrupted) { + process.exit(130); + } + interrupted = true; + controller.abort(new DOMException('Aborted', 'AbortError')); + }; + process.on('SIGINT', onSigint); + return { + signal: controller.signal, + dispose: () => process.off('SIGINT', onSigint), + }; +} + const REPORT_SOURCE_LABELS = new Map([ ['live-database', 'Database schema'], ['historic-sql', 'Query history'], @@ -364,6 +382,12 @@ function plainIngestEventProgress( message: event.message, ...(event.transient !== undefined ? { transient: event.transient } : {}), }; + case 'rate_limit_wait': + return { + percent: 50, + message: `Rate-limited (${event.provider}${event.rateLimitType ? ` ${event.rateLimitType}` : ''}); resuming in ${Math.ceil(event.remainingMs / 1_000)}s`, + transient: true, + }; case 'work_unit_started': { const total = plannedWorkUnitCountThrough(snapshot, eventIndex); const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey); @@ -750,6 +774,8 @@ export async function runKtxIngest( ); plainProgress?.start(); structuredProgress?.start(); + const cliAbort = deps.abortSignal ? null : createCliAbortSignal(); + const abortSignal = deps.abortSignal ?? cliAbort?.signal; let result: LocalMetabaseFanoutResult; try { result = await executeMetabaseFanout({ @@ -763,6 +789,7 @@ export async function runKtxIngest( embeddingProvider, ...(memoryFlow ? { memoryFlow } : {}), ...(progress ? { progress } : {}), + ...(abortSignal ? { abortSignal } : {}), }); plainProgress?.flush(); if (args.outputMode === 'json') { @@ -772,6 +799,7 @@ export async function runKtxIngest( } } finally { plainProgress?.flush(); + cliAbort?.dispose(); } return result.status === 'all_failed' ? 1 : 0; } @@ -820,6 +848,8 @@ export async function runKtxIngest( plainProgress?.start(); structuredProgress?.start(); + const cliAbort = deps.abortSignal ? null : createCliAbortSignal(); + const abortSignal = deps.abortSignal ?? cliAbort?.signal; try { const result = await executeLocalIngest({ @@ -836,6 +866,7 @@ export async function runKtxIngest( embeddingProvider, ...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}), ...(memoryFlow ? { memoryFlow } : {}), + ...(abortSignal ? { abortSignal } : {}), }); if (shouldUseLiveViz && memoryFlow) { latestMemoryFlowSnapshot = finalRunMemoryFlowInput(memoryFlow.snapshot(), result.report); @@ -854,6 +885,7 @@ export async function runKtxIngest( } finally { plainProgress?.flush(); liveTui?.close(); + cliAbort?.dispose(); } } diff --git a/packages/cli/test/context/core/abort.test.ts b/packages/cli/test/context/core/abort.test.ts new file mode 100644 index 00000000..aed46c1e --- /dev/null +++ b/packages/cli/test/context/core/abort.test.ts @@ -0,0 +1,31 @@ +import { describe, expect, it, vi } from 'vitest'; +import { createAbortError, isAbortError, linkAbortSignal, throwIfAborted } from '../../../src/context/core/abort.js'; + +describe('abort helpers', () => { + it('recognizes DOMException abort errors and common abort-shaped errors', () => { + expect(isAbortError(createAbortError())).toBe(true); + expect(isAbortError(Object.assign(new Error('cancelled'), { name: 'AbortError' }))).toBe(true); + expect(isAbortError(Object.assign(new Error('operation aborted'), { code: 'ABORT_ERR' }))).toBe(true); + expect(isAbortError(new Error('ordinary failure'))).toBe(false); + }); + + it('throws when the provided signal is already aborted', () => { + const controller = new AbortController(); + controller.abort(); + + expect(() => throwIfAborted(controller.signal)).toThrow(/Aborted/); + }); + + it('links a child controller to a parent signal and removes the listener on dispose', () => { + const parent = new AbortController(); + const child = linkAbortSignal(parent.signal); + + expect(child.controller.signal.aborted).toBe(false); + parent.abort(); + expect(child.controller.signal.aborted).toBe(true); + + const removeSpy = vi.spyOn(parent.signal, 'removeEventListener'); + child.dispose(); + expect(removeSpy).toHaveBeenCalledWith('abort', expect.any(Function)); + }); +}); diff --git a/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts b/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts index 447cd01e..b491acf2 100644 --- a/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts +++ b/packages/cli/test/context/ingest/ingest-bundle.runner.test.ts @@ -426,6 +426,177 @@ describe('IngestBundleRunner — Stages 1 → 7', () => { ); }); + it('uses the rate-limit governor for work-unit start slots', async () => { + const deps = makeDeps(); + const acquireWorkSlot = vi.fn(async () => vi.fn()); + const runner = buildRunner(deps, { + settings: { + probeRowCount: 1, + memoryIngestionModel: 'test-model', + workUnitMaxConcurrency: 2, + rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never, + }, + }); + deps.adapter.chunk.mockResolvedValue({ + workUnits: [ + { unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] }, + { unitKey: 'u2', rawFiles: ['b.yml'], peerFileIndex: [], dependencyPaths: [] }, + ], + }); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([ + ['a.yml', 'h1'], + ['b.yml', 'h2'], + ]), + rawDirInWorktree: 'raw-sources/c1/fake/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + + await runner.run({ + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }); + + expect(acquireWorkSlot).toHaveBeenCalledTimes(2); + }); + + it('passes the job abort signal into rate-limit work-unit slots', async () => { + const deps = makeDeps(); + const controller = new AbortController(); + const acquireWorkSlot = vi.fn(async () => vi.fn()); + const runner = buildRunner(deps, { + settings: { + probeRowCount: 1, + memoryIngestionModel: 'test-model', + workUnitMaxConcurrency: 1, + rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never, + }, + }); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['a.yml', 'h1']]), + rawDirInWorktree: 'raw-sources/c1/fake/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + + await runner.run( + { + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }, + { jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any, + ); + + expect(acquireWorkSlot).toHaveBeenCalledWith(controller.signal); + }); + + it('does not convert aborted work-unit agent loops into failed work units', async () => { + const deps = makeDeps(); + const controller = new AbortController(); + deps.agentRunner.runLoop.mockImplementation(async () => { + controller.abort(); + throw new DOMException('Aborted', 'AbortError'); + }); + const runner = buildRunner(deps, { + settings: { + probeRowCount: 1, + memoryIngestionModel: 'test-model', + workUnitMaxConcurrency: 1, + }, + }); + (runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({ + currentHashes: new Map([['a.yml', 'h1']]), + rawDirInWorktree: 'raw-sources/c1/fake/s', + }); + (runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x'); + + await expect( + runner.run( + { + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }, + { jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any, + ), + ).rejects.toThrow(/Aborted/); + + expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1'); + expect(deps.reportsRepo.create).not.toHaveBeenCalledWith( + expect.objectContaining({ + body: expect.objectContaining({ + failedWorkUnits: expect.arrayContaining(['u1']), + }), + }), + ); + }); + + it('emits trace and memory-flow status for rate-limit waits', async () => { + const deps = makeDeps(); + let subscriber: ((state: any) => void) | undefined; + const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput()); + const runner = buildRunner(deps, { + settings: { + probeRowCount: 1, + memoryIngestionModel: 'test-model', + rateLimitGovernor: { + acquireWorkSlot: vi.fn(async () => vi.fn()), + subscribe: vi.fn((cb: (state: any) => void) => { + subscriber = cb; + return vi.fn(); + }), + } as never, + }, + }); + (runner as any).runInner = async (_job: any, ctx: any) => { + subscriber?.({ + kind: 'wait_tick', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + resumeAtMs: 2_000, + remainingMs: 1_000, + }); + ctx.memoryFlow.emit({ type: 'report_created', runId: 'run-1' }); + return { + runId: 'run-1', + syncId: 'sync-1', + diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 }, + workUnitCount: 0, + failedWorkUnits: [], + artifactsWritten: 0, + commitSha: null, + }; + }; + + await runner.run( + { + jobId: 'j1', + connectionId: 'c1', + sourceKey: 'fake', + trigger: 'upload', + bundleRef: { kind: 'upload', uploadId: 'upload-x' }, + }, + { memoryFlow } as any, + ); + + expect(memoryFlow.snapshot().events).toContainEqual( + expect.objectContaining({ + type: 'rate_limit_wait', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + resumeAtMs: 2_000, + remainingMs: 1_000, + }), + ); + }); + it('fails before squash when reconciliation leaves a touched wiki page with dangling refs', async () => { const deps = makeDeps(); let currentToolSession: any = null; diff --git a/packages/cli/test/context/ingest/local-bundle-runtime.test.ts b/packages/cli/test/context/ingest/local-bundle-runtime.test.ts index 9d1ec9b4..e3031cc5 100644 --- a/packages/cli/test/context/ingest/local-bundle-runtime.test.ts +++ b/packages/cli/test/context/ingest/local-bundle-runtime.test.ts @@ -301,6 +301,7 @@ describe('createLocalBundleIngestRuntime', () => { 'memoryIngestionModel', 'probeRowCount', 'profileIngest', + 'rateLimitGovernor', 'workUnitFailureMode', 'workUnitMaxConcurrency', 'workUnitStepBudget', diff --git a/packages/cli/test/context/ingest/memory-flow/schema.test.ts b/packages/cli/test/context/ingest/memory-flow/schema.test.ts index 1aaeec4b..ee8f3bb9 100644 --- a/packages/cli/test/context/ingest/memory-flow/schema.test.ts +++ b/packages/cli/test/context/ingest/memory-flow/schema.test.ts @@ -146,6 +146,29 @@ describe('memory-flow schemas', () => { expect(parsed.events).toContainEqual({ type: 'stage_skipped', stage: 'actions', reason: 'requires LLM' }); }); + it('accepts rate-limit wait replay events', () => { + expect( + memoryFlowReplayInputSchema.parse({ + ...snapshot(), + events: [ + { + type: 'rate_limit_wait', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + resumeAtMs: 2_000, + remainingMs: 1_000, + }, + ], + }).events[0], + ).toEqual({ + type: 'rate_limit_wait', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + resumeAtMs: 2_000, + remainingMs: 1_000, + }); + }); + it('parses snapshot and closed stream events', () => { expect(memoryFlowStreamEventSchema.parse({ type: 'snapshot', snapshot: snapshot({ status: 'done' }) })).toEqual({ type: 'snapshot', diff --git a/packages/cli/test/context/llm/ai-sdk-runtime.test.ts b/packages/cli/test/context/llm/ai-sdk-runtime.test.ts index 74987094..bab7d1d7 100644 --- a/packages/cli/test/context/llm/ai-sdk-runtime.test.ts +++ b/packages/cli/test/context/llm/ai-sdk-runtime.test.ts @@ -107,6 +107,199 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => { expect(result.error).toBe(err); }); + it('reports AI SDK retry-after rate limits and retries through the governor', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const rateLimitError = Object.assign(new Error('too many requests'), { + name: 'TooManyRequestsError', + retryAfter: 2, + statusCode: 429, + }); + (generateText as any).mockRejectedValueOnce(rateLimitError).mockResolvedValueOnce({ + text: 'done', + toolCalls: [], + steps: [], + usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 }, + }); + const runtime = new AiSdkKtxLlmRuntime({ + llmProvider: llmProvider as any, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + }); + + expect(result.stopReason).toBe('natural'); + expect(report).toHaveBeenCalledWith({ + provider: 'anthropic-api', + status: 'rejected', + retryAfterMs: 2_000, + rateLimitType: 'http_429', + }); + expect(waitForReady).toHaveBeenCalledTimes(2); + expect(generateText).toHaveBeenCalledTimes(2); + }); + + it('does not retry AI SDK rate limits without a governor', async () => { + const rateLimitError = Object.assign(new Error('too many requests'), { + name: 'TooManyRequestsError', + statusCode: 429, + }); + (generateText as any).mockRejectedValue(rateLimitError); + // The beforeEach runtime is constructed without a rateLimitGovernor. + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + }); + + expect(result.stopReason).toBe('error'); + expect(generateText).toHaveBeenCalledTimes(1); + }); + + it('honors a governor retry budget of one attempt without retrying', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const rateLimitError = Object.assign(new Error('too many requests'), { + name: 'TooManyRequestsError', + statusCode: 429, + }); + (generateText as any).mockRejectedValue(rateLimitError); + const runtime = new AiSdkKtxLlmRuntime({ + llmProvider: llmProvider as any, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 1 } as never, + }); + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + }); + + expect(result.stopReason).toBe('error'); + expect(generateText).toHaveBeenCalledTimes(1); + expect(report).not.toHaveBeenCalled(); + }); + + it('reports Anthropic API response-header utilization to the governor', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + (generateText as any).mockResolvedValue({ + text: 'done', + toolCalls: [], + steps: [], + response: { + headers: { + 'anthropic-ratelimit-requests-limit': '100', + 'anthropic-ratelimit-requests-remaining': '8', + 'anthropic-ratelimit-input-tokens-limit': '10000', + 'anthropic-ratelimit-input-tokens-remaining': '9000', + }, + }, + }); + const runtime = new AiSdkKtxLlmRuntime({ + llmProvider: llmProvider as any, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + }); + + expect(result.stopReason).toBe('natural'); + expect(report).toHaveBeenCalledWith({ + provider: 'anthropic-api', + status: 'allowed', + rateLimitType: 'rpm', + utilization: 0.92, + }); + }); + + it('reports generic x-ratelimit response-header utilization for Vertex providers', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const vertexProvider = { + ...llmProvider, + getModel: vi.fn().mockReturnValue({ modelId: 'gemini-3-pro', provider: 'google-vertex' }), + }; + (generateText as any).mockResolvedValue({ + text: 'done', + toolCalls: [], + steps: [], + response: { + headers: { + 'x-ratelimit-limit-requests': '200', + 'x-ratelimit-remaining-requests': '30', + 'x-ratelimit-limit-tokens': '100000', + 'x-ratelimit-remaining-tokens': '4000', + }, + }, + }); + const runtime = new AiSdkKtxLlmRuntime({ + llmProvider: vertexProvider as any, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + }); + + expect(result.stopReason).toBe('natural'); + expect(report).toHaveBeenCalledWith({ + provider: 'vertex', + status: 'allowed', + rateLimitType: 'tpm', + utilization: 0.96, + }); + }); + + it('passes abort signals into governor waits and AI SDK generateText calls', async () => { + const controller = new AbortController(); + const waitForReady = vi.fn().mockResolvedValue(undefined); + (generateText as any).mockResolvedValue({ text: 'done', toolCalls: [], steps: [] }); + const runtime = new AiSdkKtxLlmRuntime({ + llmProvider: llmProvider as any, + rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + const result = await runtime.runAgentLoop({ + modelRole: 'candidateExtraction', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + abortSignal: controller.signal, + }); + + expect(result.stopReason).toBe('natural'); + expect(waitForReady).toHaveBeenCalledWith(controller.signal); + expect((generateText as any).mock.calls[0][0].abortSignal).toBe(controller.signal); + }); + it('returns metrics with stepCount, per-step boundaries, and aggregate token usage', async () => { (generateText as any).mockImplementation(async (opts: any) => { await opts.onStepFinish({}); diff --git a/packages/cli/test/context/llm/claude-code-runtime.test.ts b/packages/cli/test/context/llm/claude-code-runtime.test.ts index 5c56c26c..ba83cde6 100644 --- a/packages/cli/test/context/llm/claude-code-runtime.test.ts +++ b/packages/cli/test/context/llm/claude-code-runtime.test.ts @@ -9,6 +9,14 @@ async function* stream(messages: SDKMessage[]): AsyncGenerator } } +function deferred() { + let resolve!: (value: T | PromiseLike) => void; + const promise = new Promise((innerResolve) => { + resolve = innerResolve; + }); + return { promise, resolve }; +} + function initMessage(overrides: Partial> = {}): Extract< SDKMessage, { type: 'system'; subtype: 'init' } @@ -91,6 +99,247 @@ describe('ClaudeCodeKtxLlmRuntime', () => { }); }); + it('waits before Claude Code text generation and reports rate-limit events', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const query = vi.fn((_input: any) => + stream([ + { + type: 'rate_limit_event', + rate_limit_info: { + status: 'allowed_warning', + resetsAt: new Date(2_000).toISOString(), + rateLimitType: 'five_hour', + utilization: 0.91, + }, + } as unknown as SDKMessage, + resultMessage({ result: 'ok' }), + ]), + ); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok'); + expect(waitForReady).toHaveBeenCalledTimes(1); + expect(report).toHaveBeenCalledWith({ + provider: 'claude-subscription', + status: 'warning', + resetAtMs: 2_000, + rateLimitType: 'five_hour', + utilization: 0.91, + }); + }); + + it('maps numeric Claude Code reset times from SDK rate-limit events', async () => { + const report = vi.fn(); + const resetAtMs = 1_700_000_000_000; + const query = vi.fn((_input: any) => + stream([ + { + type: 'rate_limit_event', + rate_limit_info: { + status: 'rejected', + resetsAt: resetAtMs, + rateLimitType: 'five_hour', + utilization: 1, + }, + } as unknown as SDKMessage, + resultMessage({ result: 'ok' }), + ]), + ); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report, maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok'); + + expect(report).toHaveBeenCalledWith({ + provider: 'claude-subscription', + status: 'rejected', + resetAtMs, + rateLimitType: 'five_hour', + utilization: 1, + }); + }); + + it('retries a Claude Code query after an SDK rate-limit result error', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const resetAtMs = 1_700_000_000_000; + const query = vi + .fn() + .mockReturnValueOnce( + stream([ + { + type: 'rate_limit_event', + rate_limit_info: { + status: 'rejected', + resetsAt: resetAtMs, + rateLimitType: 'five_hour', + utilization: 1, + }, + } as unknown as SDKMessage, + resultMessage({ + subtype: 'error_during_execution', + is_error: true, + result: '', + errors: ['rate limit retry budget exhausted'], + terminal_reason: 'model_error', + } as never), + ]), + ) + .mockReturnValueOnce(stream([resultMessage({ result: 'ok' })])); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok'); + + expect(query).toHaveBeenCalledTimes(2); + expect(waitForReady).toHaveBeenCalledTimes(2); + expect(report).toHaveBeenCalledWith({ + provider: 'claude-subscription', + status: 'rejected', + resetAtMs, + rateLimitType: 'five_hour', + utilization: 1, + }); + }); + + it('reports Claude Code api retry messages as warning signals', async () => { + const report = vi.fn(); + const query = vi.fn((_input: any) => + stream([ + { + type: 'system', + subtype: 'api_retry', + retry_delay_ms: 12_000, + } as unknown as SDKMessage, + resultMessage({ result: 'ok' }), + ]), + ); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report, maxRetryAttempts: () => 6 } as never, + }); + + await runtime.generateText({ role: 'default', prompt: 'hello' }); + expect(report).toHaveBeenCalledWith({ + provider: 'claude-subscription', + status: 'warning', + retryAfterMs: 12_000, + rateLimitType: 'api_retry', + }); + }); + + it('passes abort signals into Claude Code governor waits', async () => { + const controller = new AbortController(); + const waitForReady = vi.fn().mockResolvedValue(undefined); + const query = vi.fn((_input: any) => stream([resultMessage({ result: 'ok' })])); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).resolves.toBe('ok'); + + expect(waitForReady).toHaveBeenCalledWith(controller.signal); + }); + + it('interrupts an active Claude Code query when the abort signal fires', async () => { + const controller = new AbortController(); + const streamStarted = deferred(); + const releaseStream = deferred(); + const interrupt = vi.fn(() => releaseStream.resolve()); + const queryResult = { + async *[Symbol.asyncIterator]() { + streamStarted.resolve(); + await releaseStream.promise; + yield resultMessage({ result: 'ok' }); + }, + interrupt, + }; + const query = vi.fn(() => queryResult as never); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + const pending = runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal }); + await streamStarted.promise; + controller.abort(); + + await expect(pending).rejects.toThrow(/Aborted/); + expect(interrupt).toHaveBeenCalledTimes(1); + }); + + it('throws abort before starting Claude Code query when the signal is already aborted', async () => { + const controller = new AbortController(); + controller.abort(); + const query = vi.fn((_input: any) => stream([resultMessage({ result: 'ok' })])); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).rejects.toThrow(/Aborted/); + expect(query).not.toHaveBeenCalled(); + }); + + it('treats an interrupted Claude Code stream with no result as abort', async () => { + const controller = new AbortController(); + const streamStarted = deferred(); + const releaseStream = deferred(); + const interrupt = vi.fn(() => releaseStream.resolve()); + const queryResult = { + async *[Symbol.asyncIterator]() { + streamStarted.resolve(); + await releaseStream.promise; + }, + interrupt, + }; + const query = vi.fn(() => queryResult as never); + const runtime = new ClaudeCodeKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'sonnet' }, + query, + env: {}, + rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + const pending = runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal }); + await streamStarted.promise; + controller.abort(); + + await expect(pending).rejects.toThrow(/Aborted/); + expect(interrupt).toHaveBeenCalledTimes(1); + }); + it('validates structured output with the caller schema and whitelists the SDK StructuredOutput tool', async () => { const schema = z.object({ answer: z.string() }); const query = vi.fn((_input: any) => diff --git a/packages/cli/test/context/llm/codex-runtime.test.ts b/packages/cli/test/context/llm/codex-runtime.test.ts index 2d408543..4c3fcdfd 100644 --- a/packages/cli/test/context/llm/codex-runtime.test.ts +++ b/packages/cli/test/context/llm/codex-runtime.test.ts @@ -130,6 +130,150 @@ describe('CodexKtxLlmRuntime', () => { ).rejects.toThrow('Codex structured output failed validation'); }); + it('reports Codex rate-limit failures and retries with opaque backoff', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const fakeRunner = { + runStreamed: vi + .fn() + .mockResolvedValueOnce(events([{ type: 'turn.failed', error: { message: '429 rate limit exceeded' } }])) + .mockResolvedValueOnce( + events([ + { type: 'turn.started' }, + { type: 'item.completed', item: { type: 'agent_message', text: 'ok' } }, + { type: 'turn.completed' }, + ]), + ), + }; + const runtime = new CodexKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'codex' }, + runner: fakeRunner, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok'); + expect(report).toHaveBeenCalledWith({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' }); + expect(waitForReady).toHaveBeenCalledTimes(2); + expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(2); + }); + + it('reports thrown Codex rate-limit failures and retries with opaque backoff', async () => { + const waitForReady = vi.fn().mockResolvedValue(undefined); + const report = vi.fn(); + const fakeRunner = { + runStreamed: vi + .fn() + .mockRejectedValueOnce(new Error('ThreadError: 429 rate limit exceeded')) + .mockResolvedValueOnce( + events([ + { type: 'turn.started' }, + { type: 'item.completed', item: { type: 'agent_message', text: 'ok' } }, + { type: 'turn.completed' }, + ]), + ), + }; + const runtime = new CodexKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'codex' }, + runner: fakeRunner, + rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok'); + + expect(report).toHaveBeenCalledWith({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' }); + expect(waitForReady).toHaveBeenCalledTimes(2); + expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(2); + }); + + it('surfaces Codex rate-limit failures without retrying when no governor is present', async () => { + const fakeRunner = runner([{ type: 'turn.failed', error: { message: '429 rate limit exceeded' } }]); + const runtime = new CodexKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'codex' }, + runner: fakeRunner, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).rejects.toThrow(/rate limit/i); + expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(1); + }); + + it('passes abort signals into Codex text generation and governor waits', async () => { + const controller = new AbortController(); + const waitForReady = vi.fn().mockResolvedValue(undefined); + let observedSignal: AbortSignal | undefined; + const fakeRunner = { + runStreamed: vi.fn(async (input: { signal?: AbortSignal }) => { + observedSignal = input.signal; + return events([ + { type: 'turn.started' }, + { type: 'item.completed', item: { type: 'agent_message', text: 'ok' } }, + { type: 'turn.completed' }, + ]); + }), + }; + const runtime = new CodexKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'codex' }, + runner: fakeRunner, + rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never, + }); + + await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).resolves.toBe('ok'); + + expect(waitForReady).toHaveBeenCalledWith(controller.signal); + expect(observedSignal).toBe(controller.signal); + }); + + it('links the parent abort signal into Codex agent-loop streamed runs', async () => { + const controller = new AbortController(); + let releaseStream!: () => void; + const streamRelease = new Promise((resolve) => { + releaseStream = resolve; + }); + let markRunnerCalled!: () => void; + const runnerCalled = new Promise((resolve) => { + markRunnerCalled = resolve; + }); + let observedSignal: AbortSignal | undefined; + const fakeRunner = { + runStreamed: vi.fn(async (input: { signal?: AbortSignal }) => { + observedSignal = input.signal; + markRunnerCalled(); + return (async function* () { + await streamRelease; + yield { type: 'turn.started' }; + yield { type: 'item.completed', item: { type: 'agent_message', text: 'ok' } }; + yield { type: 'turn.completed' }; + })(); + }), + }; + const runtime = new CodexKtxLlmRuntime({ + projectDir: '/tmp/project', + modelSlots: { default: 'codex' }, + runner: fakeRunner, + }); + + const pending = runtime.runAgentLoop({ + modelRole: 'default', + systemPrompt: '', + userPrompt: '', + toolSet: {}, + stepBudget: 10, + telemetryTags: {}, + abortSignal: controller.signal, + }); + + await runnerCalled; + expect(observedSignal).toBeDefined(); + expect(observedSignal).not.toBe(controller.signal); + controller.abort(); + expect(observedSignal?.aborted).toBe(true); + releaseStream(); + await expect(pending).resolves.toMatchObject({ stopReason: 'natural' }); + }); + it('starts and closes a temporary MCP server for tool-backed agent loops', async () => { const close = vi.fn(async () => undefined); const startMcpServer = vi.fn(async () => ({ diff --git a/packages/cli/test/context/llm/local-config.test.ts b/packages/cli/test/context/llm/local-config.test.ts index e153baaf..eed66261 100644 --- a/packages/cli/test/context/llm/local-config.test.ts +++ b/packages/cli/test/context/llm/local-config.test.ts @@ -7,6 +7,7 @@ import { import { createLocalKtxEmbeddingProviderFromConfig, createLocalKtxLlmProviderFromConfig, + createLocalKtxLlmRuntimeFromConfig, resolveLocalKtxEmbeddingConfig, resolveLocalKtxLlmConfig, } from '../../../src/context/llm/local-config.js'; @@ -129,6 +130,64 @@ describe('local KTX LLM config', () => { vertexFallbackTo5m: false, }); }); + + it('passes the rate-limit governor into created runtimes', () => { + const rateLimitGovernor = {} as never; + const createClaudeCodeRuntime = vi.fn(() => ({ + generateText: vi.fn(), + generateObject: vi.fn(), + runAgentLoop: vi.fn(), + })); + const createCodexRuntime = vi.fn(() => ({ + generateText: vi.fn(), + generateObject: vi.fn(), + runAgentLoop: vi.fn(), + })); + const createAiSdkRuntime = vi.fn(() => ({ + generateText: vi.fn(), + generateObject: vi.fn(), + runAgentLoop: vi.fn(), + })); + const createKtxLlmProvider = vi.fn(() => ({ + getModel: vi.fn(), + getModelByName: vi.fn(), + cacheMarker: vi.fn(), + repairToolCallHandler: vi.fn(), + thinkingProviderOptions: vi.fn(), + telemetryConfig: vi.fn(), + promptCachingConfig: vi.fn(), + activeBackend: vi.fn(() => 'anthropic'), + })); + + createLocalKtxLlmRuntimeFromConfig( + { + provider: { backend: 'claude-code' }, + models: { default: 'sonnet' }, + promptCaching: undefined, + }, + { projectDir: '/tmp/project', env: {}, rateLimitGovernor, createClaudeCodeRuntime }, + ); + createLocalKtxLlmRuntimeFromConfig( + { + provider: { backend: 'codex' }, + models: { default: 'codex' }, + promptCaching: undefined, + }, + { projectDir: '/tmp/project', env: {}, rateLimitGovernor, createCodexRuntime }, + ); + createLocalKtxLlmRuntimeFromConfig( + { + provider: { backend: 'anthropic' }, + models: { default: 'claude-sonnet-4-6' }, + promptCaching: undefined, + }, + { env: {}, rateLimitGovernor, createAiSdkRuntime, createKtxLlmProvider: createKtxLlmProvider as never }, + ); + + expect(createClaudeCodeRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor })); + expect(createCodexRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor })); + expect(createAiSdkRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor })); + }); }); describe('local KTX embedding config', () => { diff --git a/packages/cli/test/context/llm/rate-limit-governor.test.ts b/packages/cli/test/context/llm/rate-limit-governor.test.ts new file mode 100644 index 00000000..51fcba84 --- /dev/null +++ b/packages/cli/test/context/llm/rate-limit-governor.test.ts @@ -0,0 +1,278 @@ +import { describe, expect, it } from 'vitest'; +import { + createRateLimitGovernorConfig, + RateLimitGovernor, + type RateLimitWaitState, +} from '../../../src/context/llm/rate-limit-governor.js'; + +function testClock(startMs = 1_000) { + let nowMs = startMs; + return { + now: () => nowMs, + advance: (ms: number) => { + nowMs += ms; + }, + }; +} + +async function flushMicrotasks(turns = 10): Promise { + for (let i = 0; i < turns; i += 1) { + await Promise.resolve(); + } +} + +describe('RateLimitGovernor', () => { + it('drops and restores the effective work-unit limit from warning signals', () => { + const clock = testClock(); + const states: RateLimitWaitState[] = []; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 6, minConcurrencyUnderPressure: 1 }), + { now: clock.now, sleep: async () => undefined, random: () => 0 }, + ); + governor.subscribe((state) => states.push(state)); + + expect(governor.currentLimit()).toBe(6); + governor.report({ + provider: 'claude-subscription', + status: 'warning', + utilization: 0.91, + rateLimitType: 'five_hour', + }); + expect(governor.currentLimit()).toBe(1); + governor.report({ + provider: 'claude-subscription', + status: 'allowed', + utilization: 0.2, + rateLimitType: 'five_hour', + }); + expect(governor.currentLimit()).toBe(6); + expect(states.map((state) => state.kind)).toContain('concurrency_adjusted'); + }); + + it('blocks work slots during a rejected reset window and emits wait states', async () => { + const clock = testClock(); + const states: RateLimitWaitState[] = []; + const sleeps: number[] = []; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 2, waitStateTickMs: 100 }), + { + now: clock.now, + random: () => 0, + sleep: async (ms) => { + sleeps.push(ms); + clock.advance(ms); + }, + }, + ); + governor.subscribe((state) => states.push(state)); + + governor.report({ provider: 'anthropic-api', status: 'rejected', retryAfterMs: 250, rateLimitType: 'rpm' }); + const release = await governor.acquireWorkSlot(); + release(); + + expect(sleeps).toEqual([100, 100, 50]); + expect(states.some((state) => state.kind === 'wait_started' && state.provider === 'anthropic-api')).toBe(true); + expect(states.some((state) => state.kind === 'wait_finished' && state.provider === 'anthropic-api')).toBe(true); + }); + + it('rejects an interrupted wait without consuming a work slot', async () => { + const clock = testClock(); + let abortListener: (() => void) | undefined; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 1, waitStateTickMs: 100 }), + { + now: clock.now, + random: () => 0, + sleep: async (_ms, signal) => + new Promise((_resolve, reject) => { + abortListener = () => reject(new DOMException('Aborted', 'AbortError')); + signal?.addEventListener('abort', abortListener, { once: true }); + }), + }, + ); + const controller = new AbortController(); + + governor.report({ + provider: 'claude-subscription', + status: 'rejected', + resetAtMs: 2_000, + rateLimitType: 'five_hour', + }); + const pending = governor.acquireWorkSlot(controller.signal); + controller.abort(); + abortListener?.(); + + await expect(pending).rejects.toThrow(/Aborted/); + expect(governor.activeSlots()).toBe(0); + }); + + it('rejects an already-aborted ready wait', async () => { + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 1 }), + { sleep: async () => undefined, random: () => 0 }, + ); + const controller = new AbortController(); + controller.abort(); + + await expect(governor.waitForReady(controller.signal)).rejects.toThrow(/Aborted/); + }); + + it('rejects an already-aborted work slot without consuming capacity', async () => { + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 1 }), + { sleep: async () => undefined, random: () => 0 }, + ); + const controller = new AbortController(); + controller.abort(); + + await expect(governor.acquireWorkSlot(controller.signal)).rejects.toThrow(/Aborted/); + expect(governor.activeSlots()).toBe(0); + }); + + it('uses bounded opaque backoff for rejected signals without reset hints', async () => { + const clock = testClock(); + const sleeps: number[] = []; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ + maxConcurrency: 1, + retry: { maxAttempts: 3, baseDelayMs: 1_000, maxDelayMs: 60_000, jitter: false }, + }), + { + now: clock.now, + random: () => 0, + sleep: async (ms) => { + sleeps.push(ms); + clock.advance(ms); + }, + }, + ); + + governor.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' }); + const release1 = await governor.acquireWorkSlot(); + release1(); + governor.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' }); + const release2 = await governor.acquireWorkSlot(); + release2(); + + expect(sleeps).toEqual([1_000, 2_000]); + }); + + it('exposes the configured retry budget and disables outer retries when pacing is off', () => { + const retry = { maxAttempts: 3, baseDelayMs: 1_000, maxDelayMs: 60_000, jitter: false }; + const enabled = new RateLimitGovernor(createRateLimitGovernorConfig({ retry })); + expect(enabled.maxRetryAttempts()).toBe(3); + + const disabled = new RateLimitGovernor(createRateLimitGovernorConfig({ enabled: false, retry })); + expect(disabled.maxRetryAttempts()).toBe(1); + }); + + it('emits visible wait ticks after a rejected report without a waiting caller', async () => { + const clock = testClock(); + const states: RateLimitWaitState[] = []; + const sleeps: number[] = []; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 4, minConcurrencyUnderPressure: 1, waitStateTickMs: 100 }), + { + now: clock.now, + random: () => 0, + sleep: async (ms, signal) => { + if (signal?.aborted) { + throw new DOMException('Aborted', 'AbortError'); + } + sleeps.push(ms); + clock.advance(ms); + }, + }, + ); + governor.subscribe((state) => states.push(state)); + + governor.report({ + provider: 'claude-subscription', + status: 'rejected', + resetAtMs: 1_250, + rateLimitType: 'five_hour', + }); + await flushMicrotasks(); + + expect(sleeps).toEqual([100, 100, 50]); + expect(states).toContainEqual( + expect.objectContaining({ + kind: 'wait_started', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + remainingMs: 250, + }), + ); + expect(states.filter((state) => state.kind === 'wait_tick')).toHaveLength(3); + expect(states).toContainEqual( + expect.objectContaining({ + kind: 'wait_finished', + provider: 'claude-subscription', + rateLimitType: 'five_hour', + remainingMs: 0, + }), + ); + }); + + it('does not duplicate countdown sleeps when a work slot waits during the same pause', async () => { + const clock = testClock(); + const states: RateLimitWaitState[] = []; + const sleeps: number[] = []; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 2, waitStateTickMs: 100 }), + { + now: clock.now, + random: () => 0, + sleep: async (ms, signal) => { + if (signal?.aborted) { + throw new DOMException('Aborted', 'AbortError'); + } + sleeps.push(ms); + clock.advance(ms); + }, + }, + ); + governor.subscribe((state) => states.push(state)); + + governor.report({ provider: 'anthropic-api', status: 'rejected', retryAfterMs: 250, rateLimitType: 'rpm' }); + const pendingRelease = governor.acquireWorkSlot(); + await flushMicrotasks(); + const release = await pendingRelease; + release(); + + expect(sleeps).toEqual([100, 100, 50]); + expect(states.filter((state) => state.kind === 'wait_tick')).toHaveLength(3); + expect(governor.activeSlots()).toBe(0); + }); + + it('stops the visible wait ticker when the last subscriber unsubscribes', async () => { + const clock = testClock(); + let abortCount = 0; + const governor = new RateLimitGovernor( + createRateLimitGovernorConfig({ maxConcurrency: 1, waitStateTickMs: 100 }), + { + now: clock.now, + random: () => 0, + sleep: async (_ms, signal) => + new Promise((_resolve, reject) => { + signal?.addEventListener( + 'abort', + () => { + abortCount += 1; + reject(new DOMException('Aborted', 'AbortError')); + }, + { once: true }, + ); + }), + }, + ); + const unsubscribe = governor.subscribe(() => undefined); + + governor.report({ provider: 'claude-subscription', status: 'rejected', retryAfterMs: 1_000 }); + await flushMicrotasks(1); + unsubscribe(); + await flushMicrotasks(1); + + expect(abortCount).toBe(1); + }); +}); diff --git a/packages/cli/test/context/project/config.test.ts b/packages/cli/test/context/project/config.test.ts index 6027d454..e5911a25 100644 --- a/packages/cli/test/context/project/config.test.ts +++ b/packages/cli/test/context/project/config.test.ts @@ -50,6 +50,17 @@ connections: maxConcurrency: 1, failureMode: 'continue', }, + rateLimit: { + enabled: true, + throttleThreshold: 0.8, + minConcurrencyUnderPressure: 1, + retry: { + maxAttempts: 6, + baseDelayMs: 1_000, + maxDelayMs: 60_000, + jitter: true, + }, + }, profile: false, }, agent: { @@ -163,6 +174,52 @@ ingest: expect(parseKtxProjectConfig('ingest:\n profile: json\n').ingest.profile).toBe('json'); }); + it('defaults ingest rate-limit settings', () => { + const config = buildDefaultKtxProjectConfig(); + expect(config.ingest.rateLimit).toEqual({ + enabled: true, + throttleThreshold: 0.8, + minConcurrencyUnderPressure: 1, + retry: { + maxAttempts: 6, + baseDelayMs: 1_000, + maxDelayMs: 60_000, + jitter: true, + }, + }); + }); + + it('validates ingest rate-limit retry settings', () => { + const config = parseKtxProjectConfig(` +llm: + provider: + backend: none +ingest: + rateLimit: + enabled: true + throttleThreshold: 0.7 + minConcurrencyUnderPressure: 2 + maxWaitMs: 300000 + retry: + maxAttempts: 4 + baseDelayMs: 500 + maxDelayMs: 30000 + jitter: false +`); + expect(config.ingest.rateLimit).toEqual({ + enabled: true, + throttleThreshold: 0.7, + minConcurrencyUnderPressure: 2, + maxWaitMs: 300_000, + retry: { + maxAttempts: 4, + baseDelayMs: 500, + maxDelayMs: 30_000, + jitter: false, + }, + }); + }); + it('parses global Vertex LLM config', () => { const config = parseKtxProjectConfig(` llm: diff --git a/packages/cli/test/telemetry/project-snapshot.test.ts b/packages/cli/test/telemetry/project-snapshot.test.ts index 973ffb08..ce58f40e 100644 --- a/packages/cli/test/telemetry/project-snapshot.test.ts +++ b/packages/cli/test/telemetry/project-snapshot.test.ts @@ -34,6 +34,17 @@ describe('buildProjectStackSnapshotFields', () => { adapters: [], embeddings: { backend: 'sentence-transformers', dimensions: 384 }, workUnits: { stepBudget: 40, maxConcurrency: 1, failureMode: 'continue' }, + rateLimit: { + enabled: true, + throttleThreshold: 0.8, + minConcurrencyUnderPressure: 1, + retry: { + maxAttempts: 6, + baseDelayMs: 1_000, + maxDelayMs: 60_000, + jitter: true, + }, + }, profile: false, }, llm: { provider: { backend: 'none' }, models: {}, promptCaching: {} },