feat(cli): add ingest LLM rate-limit governor with paced retries (#261)

* feat(cli): add ingest rate limit governor

* feat(cli): wire ingest rate-limit config

* feat(cli): report provider rate-limit signals

* feat(cli): show ingest rate-limit waits

* fix(cli): complete rate-limit event coverage

* fix(cli): abort ingest provider calls cleanly

* fix(cli): propagate ingest cancellation

* fix(cli): reject pre-aborted ingest rate-limit waits

* fix(cli): honor Claude rate-limit reset waits

* fix(cli): retry thrown Codex rate-limit failures

* fix(cli): type Claude rate-limit result details

* fix(cli): emit ingest rate-limit countdowns from rejected signals

* fix(cli): report ai sdk rate-limit header utilization

* fix(cli): gate LLM rate-limit retries on the governor budget

The AI SDK and Codex runtimes retried 429 / opaque rate-limit failures up
to 6-7 times with no backoff when constructed without a RateLimitGovernor
(scan, memory, setup) or with pacing disabled, ignoring Retry-After and
worsening the limit. The outer retry loop only cooperates with the
governor's pause, so without active pacing there is no backoff to apply.

Route the retry bound through a single source: RateLimitGovernor
.maxRetryAttempts(), which returns retry.maxAttempts when enabled and 1
(no outer retry) when absent or disabled. All three runtimes (ai-sdk,
codex, claude-code) now use it, so ingest.rateLimit.retry.maxAttempts
genuinely controls attempts and the hard-coded 6 (plus Codex's off-by-one
extra attempt) is gone. Backend-native retry (e.g. the AI SDK's maxRetries)
still handles transient 429s.

Also correct the ktx.yaml docs for maxWaitMs (caps each wait, not the whole
run) and maxAttempts, and sync uv.lock ktx-sl/ktx-daemon to 0.9.0.
This commit is contained in:
Andrey Avtomonov 2026-06-05 12:10:27 +02:00 committed by GitHub
parent 5a8821073b
commit c3d8cedb0b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
35 changed files with 2336 additions and 72 deletions

View file

@ -0,0 +1,39 @@
/** @internal */
export function createAbortError(message = 'Aborted'): DOMException {
return new DOMException(message, 'AbortError');
}
export function isAbortError(error: unknown): boolean {
if (error instanceof DOMException && error.name === 'AbortError') {
return true;
}
if (!error || typeof error !== 'object') {
return false;
}
const record = error as { name?: unknown; code?: unknown };
return record.name === 'AbortError' || record.code === 'ABORT_ERR';
}
/** @internal */
export function throwIfAborted(signal?: AbortSignal): void {
if (signal?.aborted) {
throw createAbortError();
}
}
export function linkAbortSignal(parent?: AbortSignal): { controller: AbortController; dispose: () => void } {
const controller = new AbortController();
if (!parent) {
return { controller, dispose: () => undefined };
}
if (parent.aborted) {
controller.abort(createAbortError());
return { controller, dispose: () => undefined };
}
const onAbort = () => controller.abort(createAbortError());
parent.addEventListener('abort', onAbort, { once: true });
return {
controller,
dispose: () => parent.removeEventListener('abort', onAbort),
};
}

View file

@ -40,6 +40,7 @@ export interface CuratorPaginationInput {
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal;
}
interface CuratorPaginationResult extends ReconciliationOutcome {
@ -243,6 +244,7 @@ export class CuratorPaginationService implements CuratorPaginationPort {
sourceKey: params.input.sourceKey,
jobId: params.input.jobId,
forceRun: params.forceRun,
abortSignal: params.input.abortSignal,
onStepFinish: params.input.onStepFinish
? ({ stepIndex, stepBudget }) =>
params.input.onStepFinish?.({ passNumber: params.passNumber, stepIndex, stepBudget })

View file

@ -21,6 +21,7 @@ export interface RepairFinalGateFailureInput {
repairKind: FinalGateRepairKind;
maxAttempts?: number;
stepBudget?: number;
abortSignal?: AbortSignal;
}
const readRepairFileSchema = z.object({
@ -200,6 +201,7 @@ export async function repairFinalGateFailure(
jobId: input.trace.context.jobId,
repairKind: input.repairKind,
},
abortSignal: input.abortSignal,
}),
);

View file

@ -3,6 +3,7 @@ import { dirname, join } from 'node:path';
import pLimit from 'p-limit';
import { z } from 'zod';
import { type KtxLogger, noopLogger } from '../../context/core/config.js';
import type { RateLimitWaitState } from '../../context/llm/rate-limit-governor.js';
import { createRuntimeToolDescriptorFromAiTool } from '../../context/llm/runtime-tools.js';
import type { KtxRuntimeToolSet } from '../../context/llm/runtime-port.js';
import type { CaptureSession, MemoryAction } from '../../context/memory/types.js';
@ -219,6 +220,10 @@ export class IngestBundleRunner {
}
async run(job: IngestBundleJob, ctx?: IngestJobContext): Promise<IngestBundleResult> {
const unsubscribeRateLimitGovernor = this.subscribeRateLimitGovernor({
trace: this.createTrace(job),
memoryFlow: ctx?.memoryFlow,
});
const key = job.connectionId;
const previous = this.chainByConnection.get(key);
if (previous) {
@ -241,10 +246,72 @@ export class IngestBundleRunner {
ctx?.memoryFlow?.finish('error', [sanitizeMemoryFlowError(error)]);
throw error;
} finally {
unsubscribeRateLimitGovernor();
await this.maybeEmitIngestProfile(job.jobId);
}
}
private formatRateLimitWait(
state: Extract<RateLimitWaitState, { kind: 'wait_tick' | 'wait_started' | 'wait_finished' }>,
): string {
const seconds = Math.ceil(state.remainingMs / 1_000);
const minutes = Math.floor(seconds / 60);
const remainder = seconds % 60;
const duration = minutes > 0 ? `${minutes}m${String(remainder).padStart(2, '0')}s` : `${seconds}s`;
const type = state.rateLimitType ? ` ${state.rateLimitType}` : '';
return `Rate-limited (${state.provider}${type}); resuming in ${duration}; Ctrl+C to stop`;
}
private subscribeRateLimitGovernor(input: {
trace: IngestTraceWriter;
memoryFlow?: MemoryFlowEventSink;
}): () => void {
const governor = this.deps.settings.rateLimitGovernor;
if (!governor) {
return () => undefined;
}
return governor.subscribe((state: RateLimitWaitState) => {
if (state.kind === 'rate_limit_observed') {
void input.trace.event('info', 'rate_limit', 'rate_limit_observed', { ...state });
return;
}
if (state.kind === 'concurrency_adjusted') {
void input.trace.event('info', 'rate_limit', 'concurrency_adjusted', { ...state });
return;
}
void input.trace.event('info', 'rate_limit', state.kind, { ...state });
if (state.kind === 'wait_tick' || state.kind === 'wait_started') {
input.memoryFlow?.emit({
type: 'rate_limit_wait',
provider: state.provider,
...(state.rateLimitType ? { rateLimitType: state.rateLimitType } : {}),
resumeAtMs: state.resumeAtMs,
remainingMs: state.remainingMs,
});
input.memoryFlow?.emit({
type: 'stage_progress',
stage: 'integration',
percent: 50,
message: this.formatRateLimitWait(state),
transient: true,
});
}
});
}
private async withRateLimitWorkSlot<T>(abortSignal: AbortSignal | undefined, fn: () => Promise<T>): Promise<T> {
const governor = this.deps.settings.rateLimitGovernor;
if (!governor) {
return fn();
}
const release = await governor.acquireWorkSlot(abortSignal);
try {
return await fn();
} finally {
release();
}
}
/**
* When profiling is enabled via the `KTX_PROFILE_INGEST` env var or the
* `ingest.profile` config setting read the job's trace + tool transcripts
@ -877,6 +944,7 @@ export class IngestBundleRunner {
includeContextEvidenceTools: boolean;
currentTableExists(tableRef: string): Promise<boolean>;
memoryFlow?: MemoryFlowEventSink;
abortSignal?: AbortSignal;
wuSkillNames: string[];
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
}): Promise<WorkUnitOutcome> {
@ -1029,6 +1097,7 @@ export class IngestBundleRunner {
jobId: input.job.jobId,
toolFailureCount: (unitKey) => input.transcriptSummaries.get(unitKey)?.fatalErrorCount ?? 0,
onStepFinish: input.onStepFinish,
abortSignal: input.abortSignal,
},
input.wu,
);
@ -1524,7 +1593,8 @@ export class IngestBundleRunner {
try {
await Promise.all(
workUnits.map((wu, index) =>
limitWorkUnit(async () => {
limitWorkUnit(() =>
this.withRateLimitWorkSlot(ctx?.abortSignal, async () => {
const outcome = await runIsolatedWorkUnit({
unitIndex: index,
ingestionBaseSha,
@ -1532,6 +1602,7 @@ export class IngestBundleRunner {
patchDir,
trace: runTrace,
workUnit: wu,
abortSignal: ctx?.abortSignal,
afterSuccess: (child) => copyTransientIngestEvidence(child.workdir, sessionWorktree.workdir),
run: async (child) => {
const scopedWikiService = this.deps.wikiService.forWorktree(child.workdir);
@ -1565,6 +1636,7 @@ export class IngestBundleRunner {
includeContextEvidenceTools: adapter.evidenceIndexing === 'documents' && !!contextReport,
currentTableExists: (tableRef) =>
this.tableRefExistsInSemanticLayer(scopedSemanticLayerService, slConnectionIds, tableRef),
abortSignal: ctx?.abortSignal,
memoryFlow,
wuSkillNames,
onStepFinish: ({ stepIndex, stepBudget }) => {
@ -1594,7 +1666,8 @@ export class IngestBundleRunner {
completedWorkUnits / workUnits.length,
`${completedWorkUnits} of ${workUnits.length} work units complete`,
);
}),
}),
),
),
);
} catch (error) {
@ -1693,6 +1766,7 @@ export class IngestBundleRunner {
reason: context.reason,
maxAttempts: 1,
stepBudget: 12,
abortSignal: ctx?.abortSignal,
});
emitStageProgress(
'integration',
@ -1714,6 +1788,7 @@ export class IngestBundleRunner {
repairKind: 'patch_semantic_gate',
maxAttempts: 1,
stepBudget: 16,
abortSignal: ctx?.abortSignal,
});
emitStageProgress(
'integration',
@ -1993,6 +2068,7 @@ export class IngestBundleRunner {
);
}
: undefined,
abortSignal: ctx?.abortSignal,
});
curatorReport = curatorOutcome.report;
curatorWarnings = curatorOutcome.warnings;
@ -2038,6 +2114,7 @@ export class IngestBundleRunner {
sourceKey: job.sourceKey,
jobId: job.jobId,
force: !!overrideReport,
abortSignal: ctx?.abortSignal,
onStepFinish: stage4
? ({ stepIndex, stepBudget }) => {
emitStageProgress('reconciliation', 85, `Reconciling results: step ${stepIndex}/${stepBudget}`, {
@ -2470,6 +2547,7 @@ export class IngestBundleRunner {
repairKind: 'final_artifact_gate',
maxAttempts: 1,
stepBudget: 16,
abortSignal: ctx?.abortSignal,
});
isolatedDiffSummary.gateRepairAttempts += gateRepair.attempts;

View file

@ -19,6 +19,7 @@ export interface ResolveTextualConflictInput {
reason: string;
maxAttempts?: number;
stepBudget?: number;
abortSignal?: AbortSignal;
}
const readIntegrationFileSchema = z.object({
@ -208,6 +209,7 @@ export async function resolveTextualConflict(
jobId: input.trace.context.jobId,
unitKey: input.unitKey,
},
abortSignal: input.abortSignal,
}),
);

View file

@ -14,6 +14,7 @@ export interface RunIsolatedWorkUnitInput {
patchDir: string;
trace: IngestTraceWriter;
workUnit: WorkUnit;
abortSignal?: AbortSignal;
run(child: IngestSessionWorktree): Promise<WorkUnitOutcome>;
afterSuccess?(child: IngestSessionWorktree): Promise<void>;
}

View file

@ -12,6 +12,7 @@ import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-
import { createRuntimeToolDescriptorFromAiTool } from '../../context/llm/runtime-tools.js';
import { createLocalKtxLlmRuntimeFromConfig } from '../../context/llm/local-config.js';
import { KtxIngestEmbeddingPortAdapter } from '../../context/llm/embedding-port.js';
import { createRateLimitGovernorConfig, RateLimitGovernor } from '../../context/llm/rate-limit-governor.js';
import { RuntimeAgentRunner, type AgentRunnerPort, type KtxLlmRuntimePort, type KtxRuntimeToolSet } from '../../context/llm/runtime-port.js';
import type { KtxEmbeddingProvider } from '../../llm/types.js';
import type { KtxLocalProject } from '../../context/project/project.js';
@ -619,7 +620,7 @@ function localIngestLlmProviderGuardMessage(projectDir: string): string {
].join('\n');
}
function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): {
function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions, rateLimitGovernor: RateLimitGovernor): {
agentRunner: AgentRunnerPort;
llmRuntime?: KtxLlmRuntimePort;
} {
@ -628,6 +629,7 @@ function resolveAgentRunner(options: CreateLocalBundleIngestRuntimeOptions): {
(options.createLlmRuntime ?? createLocalKtxLlmRuntimeFromConfig)(options.project.config.llm, {
projectDir: options.project.projectDir,
env: process.env,
rateLimitGovernor,
}) ??
undefined;
@ -677,7 +679,13 @@ export function createLocalBundleIngestRuntime(
const knowledgeIndex = new LocalKnowledgeIndex(options.project, embedding);
const knowledgeEvents = new NoopKnowledgeEventPort();
const wikiService = new KnowledgeWikiService(rootFileStore, embedding, knowledgeIndex, options.project.git, logger);
const { agentRunner, llmRuntime } = resolveAgentRunner(options);
const rateLimitGovernor = new RateLimitGovernor(
createRateLimitGovernorConfig({
...options.project.config.ingest.rateLimit,
maxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
}),
);
const { agentRunner, llmRuntime } = resolveAgentRunner(options, rateLimitGovernor);
const promptService = new PromptService({ promptsDir, partials: [], logger });
const storage = new LocalIngestStorage(options.project);
const registry = registerAdapters(options.adapters);
@ -717,6 +725,7 @@ export function createLocalBundleIngestRuntime(
workUnitMaxConcurrency: options.project.config.ingest.workUnits.maxConcurrency,
workUnitStepBudget: options.project.config.ingest.workUnits.stepBudget,
workUnitFailureMode: options.project.config.ingest.workUnits.failureMode,
rateLimitGovernor,
profileIngest: options.project.config.ingest.profile,
ingestTraceLevel: ingestTraceLevelFromEnv(),
},

View file

@ -3,6 +3,7 @@ import { cp, mkdir, rm } from 'node:fs/promises';
import { isAbsolute, resolve } from 'node:path';
import type { KtxSqlQueryExecutorPort } from '../../context/connections/query-executor.js';
import type { KtxLogger } from '../../context/core/config.js';
import { createAbortError, isAbortError } from '../../context/core/abort.js';
import type { KtxSemanticLayerComputePort } from '../../context/daemon/semantic-layer-compute.js';
import type { AgentRunnerPort, KtxLlmRuntimePort } from '../../context/llm/runtime-port.js';
import type { KtxLocalProject } from '../../context/project/project.js';
@ -36,6 +37,7 @@ export interface RunLocalIngestOptions {
queryExecutor?: KtxSqlQueryExecutorPort;
logger?: KtxLogger;
embeddingProvider?: import('../../llm/types.js').KtxEmbeddingProvider | null;
abortSignal?: AbortSignal;
}
export interface LocalIngestResult {
@ -123,10 +125,11 @@ function findAdapter(adapters: SourceAdapter[], source: string): SourceAdapter {
return adapter;
}
function localJobContext(jobId: string, memoryFlow?: MemoryFlowEventSink): IngestJobContext {
function localJobContext(jobId: string, memoryFlow?: MemoryFlowEventSink, abortSignal?: AbortSignal): IngestJobContext {
return {
jobId,
...(memoryFlow ? { memoryFlow } : {}),
...(abortSignal ? { abortSignal } : {}),
startPhase() {
return new LocalIngestPhase();
},
@ -158,6 +161,7 @@ async function runScheduledPullJob(options: {
queryExecutor?: KtxSqlQueryExecutorPort;
logger?: KtxLogger;
embeddingProvider?: import('../../llm/types.js').KtxEmbeddingProvider | null;
abortSignal?: AbortSignal;
}): Promise<LocalIngestResult> {
const runtime = createLocalBundleIngestRuntime(options);
const jobId = options.jobId ?? runtime.nextJobId();
@ -169,7 +173,7 @@ async function runScheduledPullJob(options: {
trigger: options.trigger ?? 'manual_resync',
bundleRef: { kind: 'scheduled_pull', config: options.pullConfig },
},
localJobContext(jobId, options.memoryFlow),
localJobContext(jobId, options.memoryFlow, options.abortSignal),
);
const report = await runtime.store.findByJobId(jobId);
if (!report) {
@ -212,6 +216,7 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
queryExecutor: options.queryExecutor,
logger: options.logger,
embeddingProvider: options.embeddingProvider,
abortSignal: options.abortSignal,
});
}
@ -223,7 +228,7 @@ export async function runLocalIngest(options: RunLocalIngestOptions): Promise<Lo
trigger: options.trigger ?? (options.sourceDir ? 'upload' : 'manual_resync'),
bundleRef,
},
localJobContext(jobId, options.memoryFlow),
localJobContext(jobId, options.memoryFlow, options.abortSignal),
);
const report = await runtime.store.findByJobId(jobId);
if (!report) {
@ -362,6 +367,9 @@ export async function runLocalMetabaseIngest(
const children: LocalMetabaseFanoutChild[] = [];
for (const childPlan of childPlans) {
if (options.abortSignal?.aborted) {
throw createAbortError();
}
const targetConnectionId = safeSegment('target connection id', childPlan.targetConnectionId);
if (!options.project.config.connections[targetConnectionId]) {
throw new Error(`Target connection "${targetConnectionId}" is not configured in ktx.yaml`);
@ -391,8 +399,12 @@ export async function runLocalMetabaseIngest(
queryExecutor: options.queryExecutor,
logger: options.logger,
embeddingProvider: options.embeddingProvider,
abortSignal: options.abortSignal,
});
} catch (error) {
if (isAbortError(error)) {
throw error;
}
child = await recordLocalMetabaseChildFailure({
project: options.project,
jobId: childJobId,

View file

@ -70,6 +70,13 @@ const memoryFlowEventSchema = z.discriminatedUnion('type', [
message: z.string().min(1),
transient: z.boolean().optional(),
}),
eventSchema({
type: z.literal('rate_limit_wait'),
provider: z.string(),
rateLimitType: z.string().optional(),
resumeAtMs: z.number().int().nonnegative(),
remainingMs: z.number().int().nonnegative(),
}),
eventSchema({
type: z.literal('work_unit_started'),
unitKey: z.string().min(1),

View file

@ -60,6 +60,13 @@ type MemoryFlowEventPayload =
message: string;
transient?: boolean;
}
| {
type: 'rate_limit_wait';
provider: string;
rateLimitType?: string;
resumeAtMs: number;
remainingMs: number;
}
| {
type: 'work_unit_started';
unitKey: string;

View file

@ -5,6 +5,7 @@ import type { KtxFileStorePort } from '../../context/core/file-store.js';
import type { KtxLogger } from '../../context/core/config.js';
import type { SessionOutcome } from '../../context/core/session-worktree.service.js';
import type { AgentRunnerPort, KtxLlmRuntimePort, KtxRuntimeToolSet } from '../../context/llm/runtime-port.js';
import type { RateLimitGovernor } from '../llm/rate-limit-governor.js';
import type { MemoryAction, MemoryKnowledgeSlRefsPort } from '../../context/memory/types.js';
import type { PromptService } from '../../context/prompts/prompt.service.js';
import type { SkillsRegistryService } from '../../context/skills/skills-registry.service.js';
@ -144,6 +145,7 @@ interface IngestSettingsPort {
workUnitMaxConcurrency?: number;
workUnitStepBudget?: number;
workUnitFailureMode?: 'abort' | 'continue';
rateLimitGovernor?: RateLimitGovernor;
/** Print a timing breakdown to stderr at the end of each run (config-driven; see also KTX_PROFILE_INGEST). `'json'` emits the raw structured profile. */
profileIngest?: boolean | 'json';
ingestTraceLevel?: IngestTraceLevel;
@ -323,6 +325,7 @@ export interface CuratorPaginationPort {
buildToolSet: (passNumber: number) => KtxRuntimeToolSet;
getReconciliationActions: () => MemoryAction[];
onStepFinish?: (info: { passNumber: number; stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal;
}): Promise<ReconciliationOutcome & { report: CuratorPaginationReport; warnings: string[] }>;
}

View file

@ -1,4 +1,5 @@
import type { KtxModelRole } from '../../../llm/types.js';
import { isAbortError } from '../../core/abort.js';
import type { AgentRunnerPort, KtxRuntimeToolSet, RunLoopMetrics } from '../../../context/llm/runtime-port.js';
import type { CaptureSession, MemoryAction } from '../../../context/memory/types.js';
import { listTouchedSlSources, type TouchedSlSource } from '../../../context/tools/touched-sl-sources.js';
@ -28,6 +29,7 @@ export interface WorkUnitExecutionDeps {
connectionId: string;
jobId: string;
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal;
toolFailureCount?: (unitKey: string) => number;
}
@ -106,8 +108,12 @@ export async function executeWorkUnit(deps: WorkUnitExecutionDeps, wu: WorkUnit)
jobId: deps.jobId,
},
onStepFinish: deps.onStepFinish,
abortSignal: deps.abortSignal,
});
} catch (error) {
if (isAbortError(error)) {
throw error;
}
return failWithResetFromCurrentHead(error instanceof Error ? error.message : String(error));
}

View file

@ -16,6 +16,7 @@ export interface ReconciliationContext {
jobId: string;
force?: boolean;
onStepFinish?: (info: { stepIndex: number; stepBudget: number }) => void;
abortSignal?: AbortSignal;
forceRun?: boolean;
}
@ -40,6 +41,7 @@ export async function runReconciliationStage4(ctx: ReconciliationContext): Promi
stepBudget: ctx.stepBudget,
telemetryTags: { operationName: 'ingest-bundle-reconcile', source: ctx.sourceKey, jobId: ctx.jobId },
onStepFinish: ctx.onStepFinish,
abortSignal: ctx.abortSignal,
});
return { skipped: false, stopReason: run.stopReason, error: run.error, ...(run.metrics ? { metrics: run.metrics } : {}) };
}

View file

@ -220,5 +220,6 @@ export interface IngestJobPhase {
export interface IngestJobContext {
jobId: string;
memoryFlow?: MemoryFlowEventSink;
abortSignal?: AbortSignal;
startPhase(weight: number): IngestJobPhase;
}

View file

@ -3,7 +3,9 @@ import type { KtxLlmProvider } from '../../llm/types.js';
import { generateText, Output, stepCountIs, type FlexibleSchema, type TelemetrySettings, type ToolSet } from 'ai';
import type { z } from 'zod';
import { noopLogger, type KtxLogger } from '../../context/core/config.js';
import { isAbortError } from '../core/abort.js';
import { summarizeKtxLlmDebugRequest, type KtxLlmDebugRequestRecorder } from './debug-request-recorder.js';
import type { RateLimitGovernor, RateLimitProvider, RateLimitSignal } from './rate-limit-governor.js';
import { createAiSdkToolSet } from './runtime-tools.js';
import type {
KtxGenerateObjectInput,
@ -40,12 +42,129 @@ export interface AiSdkKtxLlmRuntimeDeps {
telemetry?: AgentTelemetryPort;
logger?: KtxLogger;
debugRequestRecorder?: KtxLlmDebugRequestRecorder;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
}
function hasTools(tools: Record<string, unknown>): boolean {
return Object.keys(tools).length > 0;
}
function modelProviderName(model: unknown): RateLimitProvider {
const provider = (model as { provider?: string }).provider ?? '';
return provider.includes('vertex') || provider.includes('google') ? 'vertex' : 'anthropic-api';
}
interface HeaderLimitPair {
limit: string;
remaining: string;
rateLimitType: string;
}
const RATE_LIMIT_HEADER_PAIRS: HeaderLimitPair[] = [
{
limit: 'anthropic-ratelimit-requests-limit',
remaining: 'anthropic-ratelimit-requests-remaining',
rateLimitType: 'rpm',
},
{
limit: 'anthropic-ratelimit-tokens-limit',
remaining: 'anthropic-ratelimit-tokens-remaining',
rateLimitType: 'tpm',
},
{
limit: 'anthropic-ratelimit-input-tokens-limit',
remaining: 'anthropic-ratelimit-input-tokens-remaining',
rateLimitType: 'itpm',
},
{
limit: 'anthropic-ratelimit-output-tokens-limit',
remaining: 'anthropic-ratelimit-output-tokens-remaining',
rateLimitType: 'otpm',
},
{
limit: 'x-ratelimit-limit-requests',
remaining: 'x-ratelimit-remaining-requests',
rateLimitType: 'rpm',
},
{
limit: 'x-ratelimit-limit-tokens',
remaining: 'x-ratelimit-remaining-tokens',
rateLimitType: 'tpm',
},
];
function normalizeHeaders(headers: unknown): Record<string, string> {
if (!headers || typeof headers !== 'object') {
return {};
}
const get = (headers as { get?: unknown }).get;
if (typeof get === 'function') {
const out: Record<string, string> = {};
for (const pair of RATE_LIMIT_HEADER_PAIRS) {
const limit = get.call(headers, pair.limit);
const remaining = get.call(headers, pair.remaining);
if (typeof limit === 'string') out[pair.limit] = limit;
if (typeof remaining === 'string') out[pair.remaining] = remaining;
}
return out;
}
return Object.fromEntries(
Object.entries(headers as Record<string, unknown>)
.filter((entry): entry is [string, string | number] => typeof entry[1] === 'string' || typeof entry[1] === 'number')
.map(([key, value]) => [key.toLowerCase(), String(value)]),
);
}
function numericHeader(headers: Record<string, string>, key: string): number | undefined {
const value = Number(headers[key]);
return Number.isFinite(value) && value >= 0 ? value : undefined;
}
function utilizationForPair(headers: Record<string, string>, pair: HeaderLimitPair): number | undefined {
const limit = numericHeader(headers, pair.limit);
const remaining = numericHeader(headers, pair.remaining);
if (limit === undefined || remaining === undefined || limit <= 0) {
return undefined;
}
return 1 - Math.min(limit, remaining) / limit;
}
function aiSdkHeaderRateLimitSignal(provider: RateLimitProvider, result: unknown): RateLimitSignal | undefined {
const headers = normalizeHeaders((result as { response?: { headers?: unknown } }).response?.headers);
let best: { utilization: number; rateLimitType: string } | undefined;
for (const pair of RATE_LIMIT_HEADER_PAIRS) {
const utilization = utilizationForPair(headers, pair);
if (utilization === undefined) {
continue;
}
if (!best || utilization > best.utilization) {
best = { utilization, rateLimitType: pair.rateLimitType };
}
}
if (!best) {
return undefined;
}
return {
provider,
status: 'allowed',
rateLimitType: best.rateLimitType,
utilization: Number(best.utilization.toFixed(4)),
};
}
function retryAfterMs(error: unknown): number | undefined {
const value = (error as { retryAfter?: unknown }).retryAfter;
if (typeof value === 'number' && Number.isFinite(value) && value > 0) {
return value < 1_000 ? value * 1_000 : value;
}
return undefined;
}
function isAiSdkRateLimitError(error: unknown): boolean {
const record = error as { name?: string; statusCode?: number; status?: number };
return record.name === 'TooManyRequestsError' || record.statusCode === 429 || record.status === 429;
}
export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
private readonly logger: KtxLogger;
@ -53,6 +172,41 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
this.logger = deps.logger ?? noopLogger;
}
private async generateTextWithRateLimitRetry<T>(
provider: RateLimitProvider,
abortSignal: AbortSignal | undefined,
run: () => Promise<T>,
): Promise<T> {
// maxRetryAttempts() returns 1 when no governor is present or pacing is
// disabled, so a 429 throws immediately instead of hammering the provider
// with no backoff; the AI SDK's own maxRetries still handles transient 429s.
const maxAttempts = this.deps.rateLimitGovernor?.maxRetryAttempts() ?? 1;
let attempt = 0;
while (true) {
await this.deps.rateLimitGovernor?.waitForReady(abortSignal);
try {
const result = await run();
const signal = aiSdkHeaderRateLimitSignal(provider, result);
if (signal) {
this.deps.rateLimitGovernor?.report(signal);
}
return result;
} catch (error) {
if (isAbortError(error) || !isAiSdkRateLimitError(error) || attempt >= maxAttempts - 1) {
throw error;
}
attempt += 1;
const retryAfter = retryAfterMs(error);
this.deps.rateLimitGovernor?.report({
provider,
status: 'rejected',
rateLimitType: 'http_429',
...(retryAfter !== undefined ? { retryAfterMs: retryAfter } : {}),
});
}
}
}
async generateText(input: KtxGenerateTextInput): Promise<string> {
const model = this.deps.llmProvider.getModel(input.role);
if ((model as { provider?: string }).provider === 'deterministic') {
@ -67,12 +221,13 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
});
const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({
const request = {
model,
temperature: input.temperature ?? 0,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
...(input.abortSignal ? { abortSignal: input.abortSignal } : {}),
...(hasTools(tools)
? {
experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({
@ -80,7 +235,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}),
}
: {}),
});
};
const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), input.abortSignal, () => generateText(request));
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (typeof result.text !== 'string') {
throw new Error('KTX LLM text generation returned no text');
@ -101,12 +257,13 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
});
const split = splitKtxSystemMessages(built.messages);
const startedAt = Date.now();
const result = await generateText({
const request = {
model,
temperature: input.temperature ?? 0,
...(split.system ? { system: split.system } : {}),
messages: split.messages,
tools: built.tools as ToolSet,
...(input.abortSignal ? { abortSignal: input.abortSignal } : {}),
...(hasTools(tools)
? {
experimental_repairToolCall: this.deps.llmProvider.repairToolCallHandler({
@ -115,7 +272,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}
: {}),
output: Output.object({ schema: input.schema as unknown as FlexibleSchema<TOutput> }),
});
};
const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), input.abortSignal, () => generateText(request));
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: toLlmTokenUsage(result.totalUsage ?? result.usage) });
if (result.output == null) {
throw new Error('KTX LLM object generation returned no output');
@ -152,7 +310,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
}),
);
const result = await generateText({
const request = {
model,
temperature: 0,
stopWhen: stepCountIs(params.stepBudget),
@ -163,6 +321,7 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
...(promptMessages.system ? { system: promptMessages.system } : {}),
messages: promptMessages.messages,
tools: built.tools as ToolSet,
...(params.abortSignal ? { abortSignal: params.abortSignal } : {}),
onStepFinish: async () => {
stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
@ -179,7 +338,8 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
);
}
},
});
};
const result = await this.generateTextWithRateLimitRetry(modelProviderName(model), params.abortSignal, () => generateText(request));
return {
stopReason: 'natural',
metrics: {
@ -190,6 +350,9 @@ export class AiSdkKtxLlmRuntime implements KtxLlmRuntimePort {
},
};
} catch (error) {
if (isAbortError(error)) {
throw error;
}
const err = error instanceof Error ? error : new Error(String(error));
this.logger.warn(`[agent-runner] loop failed: ${err.message}`);
return {

View file

@ -7,8 +7,10 @@ import {
} from '@anthropic-ai/claude-agent-sdk';
import { z } from 'zod';
import { noopLogger, type KtxLogger } from '../../context/core/config.js';
import { createAbortError, isAbortError, throwIfAborted } from '../core/abort.js';
import { createKtxClaudeCodeEnv } from './claude-code-env.js';
import { resolveClaudeCodeModel } from './claude-code-models.js';
import type { RateLimitGovernor, RateLimitSignal } from './rate-limit-governor.js';
import { createClaudeSdkTools, mcpToolIds } from './runtime-tools.js';
import type {
KtxGenerateObjectInput,
@ -21,7 +23,16 @@ import type {
RunLoopStopReason,
} from './runtime-port.js';
type QueryFn = (params: Parameters<typeof defaultQuery>[0]) => AsyncIterable<SDKMessage>;
type QueryResult = AsyncIterable<SDKMessage> & {
interrupt?: () => void | Promise<void>;
};
type QueryFn = (params: Parameters<typeof defaultQuery>[0]) => QueryResult;
interface ClaudeQueryOutcome {
result: SDKResultMessage;
rejectedRateLimitSignal?: RateLimitSignal;
}
function claudeTokenUsage(result: SDKResultMessage): LlmTokenUsage {
const usage = (result as { usage?: { input_tokens?: number; output_tokens?: number } }).usage;
@ -43,6 +54,7 @@ export interface ClaudeCodeKtxLlmRuntimeDeps {
query?: QueryFn;
env?: NodeJS.ProcessEnv;
logger?: KtxLogger;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
}
const BUILTIN_TOOLS = [
@ -157,6 +169,74 @@ function expectedMcpServerNames(tools: KtxRuntimeToolSet | undefined): Set<strin
return tools && Object.keys(tools).length > 0 ? new Set([KTX_MCP_SERVER_NAME]) : new Set();
}
const CLAUDE_RATE_LIMIT_ERROR_MARKERS = /\b429\b|rate limit|too many requests|quota exceeded|overloaded|max_retries/i;
function normalizeClaudeResetAtMs(value: unknown): number | undefined {
if (typeof value === 'number' && Number.isFinite(value) && value > 0) {
return Math.round(value < 10_000_000_000 ? value * 1_000 : value);
}
if (typeof value === 'string') {
const numeric = Number(value);
if (Number.isFinite(numeric) && numeric > 0) {
return normalizeClaudeResetAtMs(numeric);
}
const parsed = Date.parse(value);
return Number.isFinite(parsed) ? parsed : undefined;
}
return undefined;
}
function isClaudeRateLimitResult(result: SDKResultMessage, rejectedSignal: RateLimitSignal | undefined): boolean {
const error = resultError(result);
if (!error) {
return false;
}
if (rejectedSignal?.status === 'rejected') {
return true;
}
const resultDetails = result as {
stop_reason?: unknown;
terminal_reason?: unknown;
errors?: unknown[];
};
const details = [
error.message,
resultDetails.stop_reason,
resultDetails.terminal_reason,
...(resultDetails.errors ?? []),
]
.filter((value): value is string => typeof value === 'string' && value.length > 0)
.join('\n');
return CLAUDE_RATE_LIMIT_ERROR_MARKERS.test(details);
}
function claudeRateLimitSignal(message: SDKMessage): RateLimitSignal | null {
const record = message as unknown as Record<string, unknown>;
if (record.type === 'rate_limit_event') {
const info = record.rate_limit_info as Record<string, unknown> | undefined;
if (!info) return null;
const rawStatus = typeof info.status === 'string' ? info.status : 'allowed';
const resetAtMs = normalizeClaudeResetAtMs(info.resetsAt);
return {
provider: 'claude-subscription',
status: rawStatus === 'rejected' ? 'rejected' : rawStatus === 'allowed_warning' ? 'warning' : 'allowed',
...(resetAtMs !== undefined ? { resetAtMs } : {}),
...(typeof info.rateLimitType === 'string' ? { rateLimitType: info.rateLimitType } : {}),
...(typeof info.utilization === 'number' ? { utilization: info.utilization } : {}),
};
}
if (record.subtype === 'api_retry' || record.type === 'api_retry') {
const retryDelayMs = typeof record.retry_delay_ms === 'number' ? record.retry_delay_ms : undefined;
return {
provider: 'claude-subscription',
status: 'warning',
...(retryDelayMs !== undefined ? { retryAfterMs: retryDelayMs } : {}),
rateLimitType: 'api_retry',
};
}
return null;
}
function managedMcpSettings(serverNames: string[]): NonNullable<Options['managedSettings']> {
return {
allowManagedMcpServersOnly: true,
@ -217,21 +297,63 @@ async function collectResult(params: {
allowedToolIds: Set<string>;
expectedMcpServerNames: Set<string>;
onAssistantTurn?: () => Promise<void>;
}): Promise<SDKResultMessage> {
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
abortSignal?: AbortSignal;
}): Promise<ClaudeQueryOutcome> {
let result: SDKResultMessage | undefined;
for await (const message of params.query({ prompt: params.prompt, options: params.options })) {
assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames);
if (countsAsAssistantTurn(message)) {
await params.onAssistantTurn?.();
}
if (isResult(message)) {
result = message;
let rejectedRateLimitSignal: RateLimitSignal | undefined;
throwIfAborted(params.abortSignal);
await params.rateLimitGovernor?.waitForReady(params.abortSignal);
throwIfAborted(params.abortSignal);
const queryResult = params.query({ prompt: params.prompt, options: params.options });
const onAbort = () => {
void Promise.resolve(queryResult.interrupt?.()).catch(() => undefined);
};
params.abortSignal?.addEventListener('abort', onAbort, { once: true });
try {
for await (const message of queryResult) {
throwIfAborted(params.abortSignal);
const rateLimitSignal = claudeRateLimitSignal(message);
if (rateLimitSignal) {
if (rateLimitSignal.status === 'rejected') {
rejectedRateLimitSignal = rateLimitSignal;
}
params.rateLimitGovernor?.report(rateLimitSignal);
}
assertInitIsolation(message, params.allowedToolIds, params.expectedMcpServerNames);
if (countsAsAssistantTurn(message)) {
await params.onAssistantTurn?.();
}
if (isResult(message)) {
result = message;
}
}
} finally {
params.abortSignal?.removeEventListener('abort', onAbort);
}
if (params.abortSignal?.aborted) {
throw createAbortError();
}
if (!result) {
throw new Error('Claude Code query returned no result message');
}
return result;
return {
result,
...(rejectedRateLimitSignal ? { rejectedRateLimitSignal } : {}),
};
}
async function collectResultWithRateLimitRetry(params: Parameters<typeof collectResult>[0]): Promise<SDKResultMessage> {
// maxRetryAttempts() returns 1 when no governor is present or pacing is
// disabled, so a rate-limited result surfaces without an extra query; the
// Claude Code SDK applies its own backoff for transient rejections.
const maxAttempts = params.rateLimitGovernor?.maxRetryAttempts() ?? 1;
for (let attempt = 0; ; attempt += 1) {
const outcome = await collectResult(params);
if (!isClaudeRateLimitResult(outcome.result, outcome.rejectedRateLimitSignal) || attempt >= maxAttempts - 1) {
return outcome.result;
}
}
}
export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
@ -252,12 +374,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
tools: input.tools,
});
const startedAt = Date.now();
const result = await collectResult({
const result = await collectResultWithRateLimitRetry({
query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
options,
allowedToolIds: new Set(mcpToolIds(input.tools ?? {})),
expectedMcpServerNames: expectedMcpServerNames(input.tools),
rateLimitGovernor: this.deps.rateLimitGovernor,
abortSignal: input.abortSignal,
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result);
@ -289,12 +413,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
outputFormat: { type: 'json_schema' as const, schema: jsonSchema(input.schema as z.ZodType) },
};
const startedAt = Date.now();
const result = await collectResult({
const result = await collectResultWithRateLimitRetry({
query: this.runQuery,
prompt: [input.system, input.prompt].filter(Boolean).join('\n\n'),
options,
allowedToolIds: new Set([...mcpToolIds(input.tools ?? {}), STRUCTURED_OUTPUT_TOOL_NAME]),
expectedMcpServerNames: expectedMcpServerNames(input.tools),
rateLimitGovernor: this.deps.rateLimitGovernor,
abortSignal: input.abortSignal,
});
input.onMetrics?.({ totalMs: Date.now() - startedAt, usage: claudeTokenUsage(result) });
const error = resultError(result);
@ -319,12 +445,14 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
maxTurns: params.stepBudget,
tools: params.toolSet,
});
const result = await collectResult({
const result = await collectResultWithRateLimitRetry({
query: this.runQuery,
prompt: params.userPrompt,
options: { ...options, systemPrompt: params.systemPrompt },
allowedToolIds: new Set(mcpToolIds(params.toolSet)),
expectedMcpServerNames: expectedMcpServerNames(params.toolSet),
rateLimitGovernor: this.deps.rateLimitGovernor,
abortSignal: params.abortSignal,
onAssistantTurn: async () => {
stepIndex += 1;
stepBoundariesMs.push(Date.now() - startedAt);
@ -355,6 +483,9 @@ export class ClaudeCodeKtxLlmRuntime implements KtxLlmRuntimePort {
},
};
} catch (error) {
if (isAbortError(error)) {
throw error;
}
const err = error instanceof Error ? error : new Error(String(error));
return {
stopReason: 'error',
@ -388,7 +519,7 @@ export async function runClaudeCodeAuthProbe(input: {
env: input.env,
maxTurns: 1,
});
const result = await collectResult({
const result = await collectResultWithRateLimitRetry({
query: input.query ?? defaultQuery,
prompt: 'Reply with exactly: ok',
options,

View file

@ -1,5 +1,6 @@
import { z } from 'zod';
import { noopLogger, type KtxLogger } from '../core/config.js';
import { isAbortError, linkAbortSignal } from '../core/abort.js';
import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js';
import {
startCodexRuntimeMcpServer,
@ -8,6 +9,7 @@ import {
import { resolveCodexModel } from './codex-models.js';
import { buildCodexRuntimeConfig } from './codex-runtime-config.js';
import { CodexSdkCliRunner, type CodexSdkRunner } from './codex-sdk-runner.js';
import type { RateLimitGovernor } from './rate-limit-governor.js';
import type {
KtxGenerateObjectInput,
KtxGenerateTextInput,
@ -24,6 +26,7 @@ export interface CodexKtxLlmRuntimeDeps {
runner?: CodexSdkRunner;
startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise<CodexRuntimeMcpServerHandle>;
logger?: KtxLogger;
rateLimitGovernor?: Pick<RateLimitGovernor, 'waitForReady' | 'report' | 'maxRetryAttempts'>;
}
function modelForRole(modelSlots: CodexKtxLlmRuntimeDeps['modelSlots'], role: string): string {
@ -159,6 +162,12 @@ function runtimeToolNames(toolSet: KtxRuntimeToolSet | undefined): string[] {
return Object.values(toolSet ?? {}).map((descriptor) => descriptor.name);
}
const CODEX_RATE_LIMIT_MARKERS = /\b429\b|rate limit|too many requests|quota exceeded|temporarily overloaded/i;
function isCodexRateLimitError(error: Error | undefined): boolean {
return !!error && CODEX_RATE_LIMIT_MARKERS.test(error.message);
}
export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
private readonly runner: CodexSdkRunner;
private readonly logger: KtxLogger;
@ -168,6 +177,37 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
this.logger = deps.logger ?? noopLogger;
}
private async runWithRateLimitRetry<T>(
abortSignal: AbortSignal | undefined,
run: () => Promise<T>,
getError: (result: T) => Error | undefined,
): Promise<T> {
// maxRetryAttempts() returns 1 when no governor is present or pacing is
// disabled, so an opaque rate-limit failure surfaces on the first attempt
// instead of being retried with no backoff.
const maxAttempts = this.deps.rateLimitGovernor?.maxRetryAttempts() ?? 1;
for (let attempt = 0; ; attempt += 1) {
await this.deps.rateLimitGovernor?.waitForReady(abortSignal);
const lastAttempt = attempt >= maxAttempts - 1;
try {
const result = await run();
const error = getError(result);
if (!isCodexRateLimitError(error) || lastAttempt) {
return result;
}
} catch (error) {
if (isAbortError(error)) {
throw error;
}
const err = error instanceof Error ? error : new Error(String(error));
if (!isCodexRateLimitError(err) || lastAttempt) {
throw error;
}
}
this.deps.rateLimitGovernor?.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' });
}
}
async generateText(input: KtxGenerateTextInput): Promise<string> {
const startedAt = Date.now();
const model = modelForRole(this.deps.modelSlots, input.role);
@ -190,18 +230,26 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
}
: {}),
});
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
}),
const result = await this.runWithRateLimitRetry(
input.abortSignal,
async () => {
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
...(input.abortSignal ? { signal: input.abortSignal } : {}),
}),
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
return { collected, summary };
},
({ collected, summary }) => summaryError(summary, collected.streamError),
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
input.onMetrics?.(metrics(summary, startedAt));
return assertSuccessfulText(summary, collected.streamError);
input.onMetrics?.(metrics(result.summary, startedAt));
return assertSuccessfulText(result.summary, result.collected.streamError);
} finally {
await mcp?.close();
}
@ -231,19 +279,27 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
}
: {}),
});
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record<string, unknown>,
}),
const result = await this.runWithRateLimitRetry(
input.abortSignal,
async () => {
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record<string, unknown>,
...(input.abortSignal ? { signal: input.abortSignal } : {}),
}),
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
return { collected, summary };
},
({ collected, summary }) => summaryError(summary, collected.streamError),
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
input.onMetrics?.(metrics(summary, startedAt));
return parseStructuredOutput(input.schema, assertSuccessfulText(summary, collected.streamError));
input.onMetrics?.(metrics(result.summary, startedAt));
return parseStructuredOutput(input.schema, assertSuccessfulText(result.summary, result.collected.streamError));
} finally {
await mcp?.close();
}
@ -272,7 +328,6 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
}
: {}),
});
const abortController = new AbortController();
const onStep = async (stepIndex: number): Promise<void> => {
try {
await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget });
@ -282,31 +337,50 @@ export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
);
}
};
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(params.systemPrompt, params.userPrompt),
configOverrides: config.configOverrides,
env: config.env,
signal: abortController.signal,
}),
{ stepBudget: params.stepBudget, abortController, onStep },
const result = await this.runWithRateLimitRetry(
params.abortSignal,
async () => {
const linked = linkAbortSignal(params.abortSignal);
const abortController = linked.controller;
try {
const collected = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(params.systemPrompt, params.userPrompt),
configOverrides: config.configOverrides,
env: config.env,
signal: abortController.signal,
}),
{ stepBudget: params.stepBudget, abortController, onStep },
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
return { collected, summary };
} finally {
linked.dispose();
}
},
({ collected, summary }) => summaryError(summary, collected.streamError),
);
const summary = summarizeCodexExecEvents(collected.events, { startedAt });
const error = summaryError(summary, collected.streamError);
const stopReason = collected.budgetExceeded ? 'budget' : error ? 'error' : summary.stopReason;
const error = summaryError(result.summary, result.collected.streamError);
if (isAbortError(error)) {
throw error;
}
const stopReason = result.collected.budgetExceeded ? 'budget' : error ? 'error' : result.summary.stopReason;
return {
stopReason,
...(stopReason === 'error' && error ? { error } : {}),
metrics: {
totalMs: Date.now() - startedAt,
usage: summary.usage,
stepCount: summary.stepCount,
stepBoundariesMs: summary.stepBoundariesMs,
usage: result.summary.usage,
stepCount: result.summary.stepCount,
stepBoundariesMs: result.summary.stepBoundariesMs,
},
};
} catch (error) {
if (isAbortError(error)) {
throw error;
}
const err = error instanceof Error ? error : new Error(String(error));
return {
stopReason: 'error',

View file

@ -6,16 +6,28 @@ import type { KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from '../project/
import { AiSdkKtxLlmRuntime } from './ai-sdk-runtime.js';
import { ClaudeCodeKtxLlmRuntime } from './claude-code-runtime.js';
import { CodexKtxLlmRuntime } from './codex-runtime.js';
import type { RateLimitGovernor } from './rate-limit-governor.js';
import type { KtxLlmRuntimePort } from './runtime-port.js';
type ClaudeCodeRuntimeDeps = ConstructorParameters<typeof ClaudeCodeKtxLlmRuntime>[0] & {
rateLimitGovernor?: RateLimitGovernor;
};
type CodexRuntimeDeps = ConstructorParameters<typeof CodexKtxLlmRuntime>[0] & {
rateLimitGovernor?: RateLimitGovernor;
};
type AiSdkRuntimeDeps = ConstructorParameters<typeof AiSdkKtxLlmRuntime>[0] & {
rateLimitGovernor?: RateLimitGovernor;
};
interface LocalConfigDeps {
env?: NodeJS.ProcessEnv;
projectDir?: string;
rateLimitGovernor?: RateLimitGovernor;
createKtxLlmProvider?: typeof createKtxLlmProvider;
createKtxEmbeddingProvider?: typeof createKtxEmbeddingProvider;
createClaudeCodeRuntime?: (deps: ConstructorParameters<typeof ClaudeCodeKtxLlmRuntime>[0]) => KtxLlmRuntimePort;
createCodexRuntime?: (deps: ConstructorParameters<typeof CodexKtxLlmRuntime>[0]) => KtxLlmRuntimePort;
createAiSdkRuntime?: (deps: { llmProvider: KtxLlmProvider }) => KtxLlmRuntimePort;
createClaudeCodeRuntime?: (deps: ClaudeCodeRuntimeDeps) => KtxLlmRuntimePort;
createCodexRuntime?: (deps: CodexRuntimeDeps) => KtxLlmRuntimePort;
createAiSdkRuntime?: (deps: AiSdkRuntimeDeps) => KtxLlmRuntimePort;
}
function resolveOptional(value: string | undefined, env: NodeJS.ProcessEnv): string | undefined {
@ -129,6 +141,7 @@ export function createLocalKtxLlmRuntimeFromConfig(
projectDir,
modelSlots: resolved.modelSlots,
env: deps.env,
rateLimitGovernor: deps.rateLimitGovernor,
});
}
if (resolved.backend === 'codex') {
@ -139,10 +152,14 @@ export function createLocalKtxLlmRuntimeFromConfig(
return (deps.createCodexRuntime ?? ((runtimeDeps) => new CodexKtxLlmRuntime(runtimeDeps)))({
projectDir,
modelSlots: resolved.modelSlots,
rateLimitGovernor: deps.rateLimitGovernor,
});
}
const llmProvider = (deps.createKtxLlmProvider ?? createKtxLlmProvider)(resolved);
return (deps.createAiSdkRuntime ?? ((runtimeDeps) => new AiSdkKtxLlmRuntime(runtimeDeps)))({ llmProvider });
return (deps.createAiSdkRuntime ?? ((runtimeDeps) => new AiSdkKtxLlmRuntime(runtimeDeps)))({
llmProvider,
rateLimitGovernor: deps.rateLimitGovernor,
});
}
export function resolveLocalKtxEmbeddingConfig(

View file

@ -0,0 +1,387 @@
import { createAbortError, throwIfAborted } from '../core/abort.js';
export type RateLimitProvider = 'claude-subscription' | 'anthropic-api' | 'vertex' | 'codex';
type RateLimitSignalStatus = 'allowed' | 'warning' | 'rejected';
export interface RateLimitSignal {
provider: RateLimitProvider;
status: RateLimitSignalStatus;
resetAtMs?: number;
retryAfterMs?: number;
utilization?: number;
rateLimitType?: string;
}
export interface RateLimitRetryConfig {
maxAttempts: number;
baseDelayMs: number;
maxDelayMs: number;
jitter: boolean;
}
export interface RateLimitGovernorConfig {
enabled: boolean;
maxConcurrency: number;
throttleThreshold: number;
minConcurrencyUnderPressure: number;
maxWaitMs?: number;
waitStateTickMs: number;
retry: RateLimitRetryConfig;
}
export type RateLimitWaitState =
| {
kind: 'rate_limit_observed';
provider: RateLimitProvider;
status: RateLimitSignalStatus;
rateLimitType?: string;
resetAtMs?: number;
retryAfterMs?: number;
utilization?: number;
}
| {
kind: 'concurrency_adjusted';
provider: RateLimitProvider;
from: number;
to: number;
reason: string;
rateLimitType?: string;
utilization?: number;
}
| {
kind: 'wait_started' | 'wait_tick' | 'wait_finished';
provider: RateLimitProvider;
rateLimitType?: string;
resumeAtMs: number;
remainingMs: number;
};
export interface RateLimitGovernorDeps {
now?: () => number;
sleep?: (ms: number, signal?: AbortSignal) => Promise<void>;
random?: () => number;
}
export type RateLimitRelease = () => void;
type Subscriber = (state: RateLimitWaitState) => void;
const defaultSleep = (ms: number, signal?: AbortSignal): Promise<void> =>
new Promise((resolve, reject) => {
if (signal?.aborted) {
reject(createAbortError());
return;
}
const timeout = setTimeout(resolve, ms);
signal?.addEventListener(
'abort',
() => {
clearTimeout(timeout);
reject(createAbortError());
},
{ once: true },
);
});
export function createRateLimitGovernorConfig(
input: Partial<RateLimitGovernorConfig> & { retry?: Partial<RateLimitRetryConfig> } = {},
): RateLimitGovernorConfig {
return {
enabled: input.enabled ?? true,
maxConcurrency: input.maxConcurrency ?? 1,
throttleThreshold: input.throttleThreshold ?? 0.8,
minConcurrencyUnderPressure: input.minConcurrencyUnderPressure ?? 1,
...(input.maxWaitMs !== undefined ? { maxWaitMs: input.maxWaitMs } : {}),
waitStateTickMs: input.waitStateTickMs ?? 1_000,
retry: {
maxAttempts: input.retry?.maxAttempts ?? 6,
baseDelayMs: input.retry?.baseDelayMs ?? 1_000,
maxDelayMs: input.retry?.maxDelayMs ?? 60_000,
jitter: input.retry?.jitter ?? true,
},
};
}
export class RateLimitGovernor {
private readonly now: () => number;
private readonly sleep: (ms: number, signal?: AbortSignal) => Promise<void>;
private readonly random: () => number;
private readonly subscribers = new Set<Subscriber>();
private waiters: Array<() => void> = [];
private active = 0;
private effectiveLimit: number;
private pausedUntilMs: number | null = null;
private pausedProvider: RateLimitProvider | null = null;
private pausedRateLimitType: string | undefined;
private pausedTickMs: number | null = null;
private opaqueAttempts = new Map<RateLimitProvider, number>();
private pauseGeneration = 0;
private visibleWaitAbort: AbortController | null = null;
constructor(
private readonly config: RateLimitGovernorConfig,
deps: RateLimitGovernorDeps = {},
) {
this.now = deps.now ?? Date.now;
this.sleep = deps.sleep ?? defaultSleep;
this.random = deps.random ?? Math.random;
this.effectiveLimit = Math.max(1, config.maxConcurrency);
}
currentLimit(): number {
return this.config.enabled ? this.effectiveLimit : this.config.maxConcurrency;
}
/**
* Total attempts a runtime should make for a single rate-limited LLM call,
* including the first try. Returns 1 (no outer retry) when pacing is disabled:
* the outer retry loop only exists to cooperate with this governor's pause, so
* without active pacing there is no backoff to apply and the backend's own
* retry handles transient rejections.
*/
maxRetryAttempts(): number {
return this.config.enabled ? Math.max(1, this.config.retry.maxAttempts) : 1;
}
activeSlots(): number {
return this.active;
}
subscribe(cb: Subscriber): () => void {
this.subscribers.add(cb);
if (this.pausedUntilMs !== null) {
this.startVisibleWaitTicker();
}
return () => {
this.subscribers.delete(cb);
if (this.subscribers.size === 0) {
this.stopVisibleWaitTicker();
this.wakeWaiters();
}
};
}
report(signal: RateLimitSignal): void {
if (!this.config.enabled) {
return;
}
this.emit({
kind: 'rate_limit_observed',
provider: signal.provider,
status: signal.status,
...(signal.rateLimitType ? { rateLimitType: signal.rateLimitType } : {}),
...(signal.resetAtMs !== undefined ? { resetAtMs: signal.resetAtMs } : {}),
...(signal.retryAfterMs !== undefined ? { retryAfterMs: signal.retryAfterMs } : {}),
...(signal.utilization !== undefined ? { utilization: signal.utilization } : {}),
});
if (signal.status === 'rejected') {
this.applyPause(signal);
return;
}
if (signal.status === 'warning' || (signal.utilization ?? 0) >= this.config.throttleThreshold) {
this.adjustLimit(Math.max(1, this.config.minConcurrencyUnderPressure), signal, 'provider pressure');
return;
}
this.opaqueAttempts.delete(signal.provider);
if ((signal.utilization ?? 0) < this.config.throttleThreshold) {
this.adjustLimit(Math.max(1, this.config.maxConcurrency), signal, 'provider recovered');
}
}
async waitForReady(signal?: AbortSignal): Promise<void> {
throwIfAborted(signal);
if (!this.config.enabled) {
return;
}
await this.waitForPause(signal);
throwIfAborted(signal);
}
async acquireWorkSlot(signal?: AbortSignal): Promise<RateLimitRelease> {
throwIfAborted(signal);
if (!this.config.enabled) {
this.active += 1;
return () => {
this.active -= 1;
};
}
while (true) {
throwIfAborted(signal);
await this.waitForPause(signal);
throwIfAborted(signal);
if (this.active < this.effectiveLimit) {
this.active += 1;
let released = false;
return () => {
if (released) return;
released = true;
this.active -= 1;
this.wakeWaiters();
};
}
await this.waitForSlot(signal);
}
}
private applyPause(signal: RateLimitSignal): void {
const resumeAtMs = this.resumeAtMsFor(signal);
const boundedResumeAtMs =
this.config.maxWaitMs === undefined ? resumeAtMs : Math.min(resumeAtMs, this.now() + this.config.maxWaitMs);
if (this.pausedUntilMs === null || boundedResumeAtMs > this.pausedUntilMs) {
this.pausedUntilMs = boundedResumeAtMs;
this.pausedProvider = signal.provider;
this.pausedRateLimitType = signal.rateLimitType;
this.pausedTickMs = signal.rateLimitType === 'opaque' ? Math.max(1, boundedResumeAtMs - this.now()) : null;
this.emitWait('wait_started');
this.startVisibleWaitTicker();
this.wakeWaiters();
}
this.adjustLimit(Math.max(1, this.config.minConcurrencyUnderPressure), signal, 'provider rejected');
}
private resumeAtMsFor(signal: RateLimitSignal): number {
if (signal.resetAtMs !== undefined) {
return signal.resetAtMs;
}
if (signal.retryAfterMs !== undefined) {
return this.now() + signal.retryAfterMs;
}
const attempts = this.opaqueAttempts.get(signal.provider) ?? 0;
this.opaqueAttempts.set(signal.provider, Math.min(attempts + 1, this.config.retry.maxAttempts));
const base = Math.min(
this.config.retry.maxDelayMs,
this.config.retry.baseDelayMs * 2 ** Math.min(attempts, this.config.retry.maxAttempts - 1),
);
const jitterMultiplier = this.config.retry.jitter ? 0.75 + this.random() * 0.5 : 1;
return this.now() + Math.round(base * jitterMultiplier);
}
private adjustLimit(to: number, signal: RateLimitSignal, reason: string): void {
const bounded = Math.max(1, Math.min(this.config.maxConcurrency, to));
if (bounded === this.effectiveLimit) {
return;
}
const from = this.effectiveLimit;
this.effectiveLimit = bounded;
this.emit({
kind: 'concurrency_adjusted',
provider: signal.provider,
from,
to: bounded,
reason,
...(signal.rateLimitType ? { rateLimitType: signal.rateLimitType } : {}),
...(signal.utilization !== undefined ? { utilization: signal.utilization } : {}),
});
this.wakeWaiters();
}
private startVisibleWaitTicker(): void {
if (this.subscribers.size === 0 || this.pausedUntilMs === null) {
return;
}
this.stopVisibleWaitTicker();
const generation = (this.pauseGeneration += 1);
const controller = new AbortController();
this.visibleWaitAbort = controller;
void this.runVisibleWaitTicker(generation, controller.signal).catch(() => undefined);
}
private stopVisibleWaitTicker(): void {
this.visibleWaitAbort?.abort();
this.visibleWaitAbort = null;
}
private async runVisibleWaitTicker(generation: number, signal: AbortSignal): Promise<void> {
while (!signal.aborted && generation === this.pauseGeneration && this.pausedUntilMs !== null) {
const remainingMs = this.pausedUntilMs - this.now();
if (remainingMs <= 0) {
this.finishPause(generation);
return;
}
this.emitWait('wait_tick');
await this.sleep(Math.min(this.pausedTickMs ?? this.config.waitStateTickMs, remainingMs), signal);
}
}
private finishPause(generation?: number): void {
if (generation !== undefined && generation !== this.pauseGeneration) {
return;
}
this.emitWait('wait_finished');
this.pausedUntilMs = null;
this.pausedProvider = null;
this.pausedRateLimitType = undefined;
this.pausedTickMs = null;
this.stopVisibleWaitTicker();
this.wakeWaiters();
}
private async waitForPause(signal?: AbortSignal): Promise<void> {
throwIfAborted(signal);
while (this.pausedUntilMs !== null) {
const remainingMs = this.pausedUntilMs - this.now();
if (remainingMs <= 0) {
this.finishPause();
return;
}
if (this.visibleWaitAbort !== null) {
await this.waitForSlot(signal);
} else {
await this.sleep(Math.min(this.pausedTickMs ?? this.config.waitStateTickMs, remainingMs), signal);
}
throwIfAborted(signal);
}
}
private waitForSlot(signal?: AbortSignal): Promise<void> {
if (signal?.aborted) {
return Promise.reject(createAbortError());
}
return new Promise((resolve, reject) => {
const wake = () => {
cleanup();
resolve();
};
const onAbort = () => {
cleanup();
reject(createAbortError());
};
const cleanup = () => {
this.waiters = this.waiters.filter((candidate) => candidate !== wake);
signal?.removeEventListener('abort', onAbort);
};
this.waiters.push(wake);
signal?.addEventListener('abort', onAbort, { once: true });
});
}
private wakeWaiters(): void {
const waiters = this.waiters;
this.waiters = [];
for (const waiter of waiters) {
waiter();
}
}
private emitWait(kind: Extract<RateLimitWaitState['kind'], 'wait_started' | 'wait_tick' | 'wait_finished'>): void {
if (this.pausedUntilMs === null || this.pausedProvider === null) {
return;
}
this.emit({
kind,
provider: this.pausedProvider,
...(this.pausedRateLimitType ? { rateLimitType: this.pausedRateLimitType } : {}),
resumeAtMs: this.pausedUntilMs,
remainingMs: Math.max(0, this.pausedUntilMs - this.now()),
});
}
private emit(state: RateLimitWaitState): void {
for (const subscriber of this.subscribers) {
subscriber(state);
}
}
}

View file

@ -49,6 +49,7 @@ export interface RunLoopParams {
stepBudget: number;
telemetryTags: Record<string, string>;
onStepFinish?: (info: RunLoopStepInfo) => void | Promise<void>;
abortSignal?: AbortSignal;
}
export interface RunLoopResult {
@ -64,6 +65,7 @@ export interface KtxGenerateTextInput {
tools?: KtxRuntimeToolSet;
temperature?: number;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
abortSignal?: AbortSignal;
}
export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutput>> {
@ -74,6 +76,7 @@ export interface KtxGenerateObjectInput<TOutput, TSchema extends z.ZodType<TOutp
temperature?: number;
schema: TSchema;
onMetrics?: (metrics: { totalMs: number; usage: LlmTokenUsage }) => void;
abortSignal?: AbortSignal;
}
export interface KtxLlmRuntimePort {

View file

@ -100,6 +100,44 @@ const workUnitsSchema = z
})
.describe('Concurrency and failure handling for ingest work units.');
const ingestRateLimitRetrySchema = z
.strictObject({
maxAttempts: z
.int()
.positive()
.default(6)
.describe(
'Maximum attempts for a single rate-limited LLM call before the failure surfaces, counting the first try. Also bounds how far opaque backoff grows for providers that do not expose a reset time.',
),
baseDelayMs: z.int().positive().default(1_000).describe('Initial opaque retry delay in milliseconds.'),
maxDelayMs: z.int().positive().default(60_000).describe('Maximum opaque retry delay in milliseconds.'),
jitter: z.boolean().default(true).describe('When true, apply bounded jitter to opaque retry delays.'),
})
.describe('Retry policy for rate-limit responses that do not include a reset time or retry-after value.');
const ingestRateLimitSchema = z
.strictObject({
enabled: z.boolean().default(true).describe('Master switch for ingest LLM rate-limit pacing and visible waits.'),
throttleThreshold: z
.number()
.min(0)
.max(1)
.default(0.8)
.describe('Provider utilization at or above which ingest throttles new work-unit starts.'),
minConcurrencyUnderPressure: z
.int()
.positive()
.default(1)
.describe('Effective work-unit concurrency while a provider is under rate-limit pressure.'),
maxWaitMs: z
.int()
.positive()
.optional()
.describe('Optional cap on a single provider reset wait. Omit to wait indefinitely until the provider reset time.'),
retry: ingestRateLimitRetrySchema.prefault({}).describe('Opaque retry policy for providers without reset hints.'),
})
.describe('Rate-limit pacing and wait policy for ingest LLM calls.');
const ingestSchema = z
.strictObject({
adapters: z
@ -110,6 +148,7 @@ const ingestSchema = z
.prefault({ backend: 'none' })
.describe('Embedding configuration used when ingest adapters need to embed documents.'),
workUnits: workUnitsSchema.prefault({}).describe('Concurrency and failure handling for ingest work units.'),
rateLimit: ingestRateLimitSchema.prefault({}).describe('LLM rate-limit pacing and visible-wait policy for ingest.'),
profile: z
.union([z.boolean(), z.literal('json')])
.default(false)

View file

@ -78,6 +78,7 @@ export interface KtxIngestDeps {
readReportFile?: typeof readIngestReportSnapshotFile;
renderStoredMemoryFlow?: typeof renderMemoryFlowTui;
startLiveMemoryFlow?: typeof startLiveMemoryFlowTui;
abortSignal?: AbortSignal;
env?: NodeJS.ProcessEnv;
localIngestOptions?: Pick<
RunLocalIngestOptions,
@ -93,6 +94,23 @@ export interface KtxIngestDeps {
runtimeIo?: KtxIngestIo;
}
function createCliAbortSignal(): { signal: AbortSignal; dispose: () => void } {
const controller = new AbortController();
let interrupted = false;
const onSigint = () => {
if (interrupted) {
process.exit(130);
}
interrupted = true;
controller.abort(new DOMException('Aborted', 'AbortError'));
};
process.on('SIGINT', onSigint);
return {
signal: controller.signal,
dispose: () => process.off('SIGINT', onSigint),
};
}
const REPORT_SOURCE_LABELS = new Map<string, string>([
['live-database', 'Database schema'],
['historic-sql', 'Query history'],
@ -364,6 +382,12 @@ function plainIngestEventProgress(
message: event.message,
...(event.transient !== undefined ? { transient: event.transient } : {}),
};
case 'rate_limit_wait':
return {
percent: 50,
message: `Rate-limited (${event.provider}${event.rateLimitType ? ` ${event.rateLimitType}` : ''}); resuming in ${Math.ceil(event.remainingMs / 1_000)}s`,
transient: true,
};
case 'work_unit_started': {
const total = plannedWorkUnitCountThrough(snapshot, eventIndex);
const ordinal = workUnitOrdinalThrough(snapshot, eventIndex, event.unitKey);
@ -750,6 +774,8 @@ export async function runKtxIngest(
);
plainProgress?.start();
structuredProgress?.start();
const cliAbort = deps.abortSignal ? null : createCliAbortSignal();
const abortSignal = deps.abortSignal ?? cliAbort?.signal;
let result: LocalMetabaseFanoutResult;
try {
result = await executeMetabaseFanout({
@ -763,6 +789,7 @@ export async function runKtxIngest(
embeddingProvider,
...(memoryFlow ? { memoryFlow } : {}),
...(progress ? { progress } : {}),
...(abortSignal ? { abortSignal } : {}),
});
plainProgress?.flush();
if (args.outputMode === 'json') {
@ -772,6 +799,7 @@ export async function runKtxIngest(
}
} finally {
plainProgress?.flush();
cliAbort?.dispose();
}
return result.status === 'all_failed' ? 1 : 0;
}
@ -820,6 +848,8 @@ export async function runKtxIngest(
plainProgress?.start();
structuredProgress?.start();
const cliAbort = deps.abortSignal ? null : createCliAbortSignal();
const abortSignal = deps.abortSignal ?? cliAbort?.signal;
try {
const result = await executeLocalIngest({
@ -836,6 +866,7 @@ export async function runKtxIngest(
embeddingProvider,
...(args.debugLlmRequestFile ? { llmDebugRequestFile: args.debugLlmRequestFile } : {}),
...(memoryFlow ? { memoryFlow } : {}),
...(abortSignal ? { abortSignal } : {}),
});
if (shouldUseLiveViz && memoryFlow) {
latestMemoryFlowSnapshot = finalRunMemoryFlowInput(memoryFlow.snapshot(), result.report);
@ -854,6 +885,7 @@ export async function runKtxIngest(
} finally {
plainProgress?.flush();
liveTui?.close();
cliAbort?.dispose();
}
}

View file

@ -0,0 +1,31 @@
import { describe, expect, it, vi } from 'vitest';
import { createAbortError, isAbortError, linkAbortSignal, throwIfAborted } from '../../../src/context/core/abort.js';
describe('abort helpers', () => {
it('recognizes DOMException abort errors and common abort-shaped errors', () => {
expect(isAbortError(createAbortError())).toBe(true);
expect(isAbortError(Object.assign(new Error('cancelled'), { name: 'AbortError' }))).toBe(true);
expect(isAbortError(Object.assign(new Error('operation aborted'), { code: 'ABORT_ERR' }))).toBe(true);
expect(isAbortError(new Error('ordinary failure'))).toBe(false);
});
it('throws when the provided signal is already aborted', () => {
const controller = new AbortController();
controller.abort();
expect(() => throwIfAborted(controller.signal)).toThrow(/Aborted/);
});
it('links a child controller to a parent signal and removes the listener on dispose', () => {
const parent = new AbortController();
const child = linkAbortSignal(parent.signal);
expect(child.controller.signal.aborted).toBe(false);
parent.abort();
expect(child.controller.signal.aborted).toBe(true);
const removeSpy = vi.spyOn(parent.signal, 'removeEventListener');
child.dispose();
expect(removeSpy).toHaveBeenCalledWith('abort', expect.any(Function));
});
});

View file

@ -426,6 +426,177 @@ describe('IngestBundleRunner — Stages 1 → 7', () => {
);
});
it('uses the rate-limit governor for work-unit start slots', async () => {
const deps = makeDeps();
const acquireWorkSlot = vi.fn(async () => vi.fn());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 2,
rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never,
},
});
deps.adapter.chunk.mockResolvedValue({
workUnits: [
{ unitKey: 'u1', rawFiles: ['a.yml'], peerFileIndex: [], dependencyPaths: [] },
{ unitKey: 'u2', rawFiles: ['b.yml'], peerFileIndex: [], dependencyPaths: [] },
],
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([
['a.yml', 'h1'],
['b.yml', 'h2'],
]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run({
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
});
expect(acquireWorkSlot).toHaveBeenCalledTimes(2);
});
it('passes the job abort signal into rate-limit work-unit slots', async () => {
const deps = makeDeps();
const controller = new AbortController();
const acquireWorkSlot = vi.fn(async () => vi.fn());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 1,
rateLimitGovernor: { acquireWorkSlot, subscribe: vi.fn(() => vi.fn()) } as never,
},
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any,
);
expect(acquireWorkSlot).toHaveBeenCalledWith(controller.signal);
});
it('does not convert aborted work-unit agent loops into failed work units', async () => {
const deps = makeDeps();
const controller = new AbortController();
deps.agentRunner.runLoop.mockImplementation(async () => {
controller.abort();
throw new DOMException('Aborted', 'AbortError');
});
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
workUnitMaxConcurrency: 1,
},
});
(runner as any).stageRawFilesStage1 = vi.fn().mockResolvedValue({
currentHashes: new Map([['a.yml', 'h1']]),
rawDirInWorktree: 'raw-sources/c1/fake/s',
});
(runner as any).resolveStagedDir = vi.fn().mockResolvedValue('/tmp/stage/upload-x');
await expect(
runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ jobId: 'j1', abortSignal: controller.signal, startPhase: () => new TestJobContext('j1', null, async () => undefined, async () => undefined) } as any,
),
).rejects.toThrow(/Aborted/);
expect(deps.runsRepo.markFailed).toHaveBeenCalledWith('run-1');
expect(deps.reportsRepo.create).not.toHaveBeenCalledWith(
expect.objectContaining({
body: expect.objectContaining({
failedWorkUnits: expect.arrayContaining(['u1']),
}),
}),
);
});
it('emits trace and memory-flow status for rate-limit waits', async () => {
const deps = makeDeps();
let subscriber: ((state: any) => void) | undefined;
const memoryFlow = createMemoryFlowLiveBuffer(bundleReplayInput());
const runner = buildRunner(deps, {
settings: {
probeRowCount: 1,
memoryIngestionModel: 'test-model',
rateLimitGovernor: {
acquireWorkSlot: vi.fn(async () => vi.fn()),
subscribe: vi.fn((cb: (state: any) => void) => {
subscriber = cb;
return vi.fn();
}),
} as never,
},
});
(runner as any).runInner = async (_job: any, ctx: any) => {
subscriber?.({
kind: 'wait_tick',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
});
ctx.memoryFlow.emit({ type: 'report_created', runId: 'run-1' });
return {
runId: 'run-1',
syncId: 'sync-1',
diffSummary: { added: 0, modified: 0, deleted: 0, unchanged: 0 },
workUnitCount: 0,
failedWorkUnits: [],
artifactsWritten: 0,
commitSha: null,
};
};
await runner.run(
{
jobId: 'j1',
connectionId: 'c1',
sourceKey: 'fake',
trigger: 'upload',
bundleRef: { kind: 'upload', uploadId: 'upload-x' },
},
{ memoryFlow } as any,
);
expect(memoryFlow.snapshot().events).toContainEqual(
expect.objectContaining({
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
}),
);
});
it('fails before squash when reconciliation leaves a touched wiki page with dangling refs', async () => {
const deps = makeDeps();
let currentToolSession: any = null;

View file

@ -301,6 +301,7 @@ describe('createLocalBundleIngestRuntime', () => {
'memoryIngestionModel',
'probeRowCount',
'profileIngest',
'rateLimitGovernor',
'workUnitFailureMode',
'workUnitMaxConcurrency',
'workUnitStepBudget',

View file

@ -146,6 +146,29 @@ describe('memory-flow schemas', () => {
expect(parsed.events).toContainEqual({ type: 'stage_skipped', stage: 'actions', reason: 'requires LLM' });
});
it('accepts rate-limit wait replay events', () => {
expect(
memoryFlowReplayInputSchema.parse({
...snapshot(),
events: [
{
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
},
],
}).events[0],
).toEqual({
type: 'rate_limit_wait',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
resumeAtMs: 2_000,
remainingMs: 1_000,
});
});
it('parses snapshot and closed stream events', () => {
expect(memoryFlowStreamEventSchema.parse({ type: 'snapshot', snapshot: snapshot({ status: 'done' }) })).toEqual({
type: 'snapshot',

View file

@ -107,6 +107,199 @@ describe('AiSdkKtxLlmRuntime.runAgentLoop', () => {
expect(result.error).toBe(err);
});
it('reports AI SDK retry-after rate limits and retries through the governor', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const rateLimitError = Object.assign(new Error('too many requests'), {
name: 'TooManyRequestsError',
retryAfter: 2,
statusCode: 429,
});
(generateText as any).mockRejectedValueOnce(rateLimitError).mockResolvedValueOnce({
text: 'done',
toolCalls: [],
steps: [],
usage: { inputTokens: 1, outputTokens: 1, totalTokens: 2 },
});
const runtime = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('natural');
expect(report).toHaveBeenCalledWith({
provider: 'anthropic-api',
status: 'rejected',
retryAfterMs: 2_000,
rateLimitType: 'http_429',
});
expect(waitForReady).toHaveBeenCalledTimes(2);
expect(generateText).toHaveBeenCalledTimes(2);
});
it('does not retry AI SDK rate limits without a governor', async () => {
const rateLimitError = Object.assign(new Error('too many requests'), {
name: 'TooManyRequestsError',
statusCode: 429,
});
(generateText as any).mockRejectedValue(rateLimitError);
// The beforeEach runtime is constructed without a rateLimitGovernor.
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('error');
expect(generateText).toHaveBeenCalledTimes(1);
});
it('honors a governor retry budget of one attempt without retrying', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const rateLimitError = Object.assign(new Error('too many requests'), {
name: 'TooManyRequestsError',
statusCode: 429,
});
(generateText as any).mockRejectedValue(rateLimitError);
const runtime = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 1 } as never,
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('error');
expect(generateText).toHaveBeenCalledTimes(1);
expect(report).not.toHaveBeenCalled();
});
it('reports Anthropic API response-header utilization to the governor', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
(generateText as any).mockResolvedValue({
text: 'done',
toolCalls: [],
steps: [],
response: {
headers: {
'anthropic-ratelimit-requests-limit': '100',
'anthropic-ratelimit-requests-remaining': '8',
'anthropic-ratelimit-input-tokens-limit': '10000',
'anthropic-ratelimit-input-tokens-remaining': '9000',
},
},
});
const runtime = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('natural');
expect(report).toHaveBeenCalledWith({
provider: 'anthropic-api',
status: 'allowed',
rateLimitType: 'rpm',
utilization: 0.92,
});
});
it('reports generic x-ratelimit response-header utilization for Vertex providers', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const vertexProvider = {
...llmProvider,
getModel: vi.fn().mockReturnValue({ modelId: 'gemini-3-pro', provider: 'google-vertex' }),
};
(generateText as any).mockResolvedValue({
text: 'done',
toolCalls: [],
steps: [],
response: {
headers: {
'x-ratelimit-limit-requests': '200',
'x-ratelimit-remaining-requests': '30',
'x-ratelimit-limit-tokens': '100000',
'x-ratelimit-remaining-tokens': '4000',
},
},
});
const runtime = new AiSdkKtxLlmRuntime({
llmProvider: vertexProvider as any,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
});
expect(result.stopReason).toBe('natural');
expect(report).toHaveBeenCalledWith({
provider: 'vertex',
status: 'allowed',
rateLimitType: 'tpm',
utilization: 0.96,
});
});
it('passes abort signals into governor waits and AI SDK generateText calls', async () => {
const controller = new AbortController();
const waitForReady = vi.fn().mockResolvedValue(undefined);
(generateText as any).mockResolvedValue({ text: 'done', toolCalls: [], steps: [] });
const runtime = new AiSdkKtxLlmRuntime({
llmProvider: llmProvider as any,
rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
const result = await runtime.runAgentLoop({
modelRole: 'candidateExtraction',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
abortSignal: controller.signal,
});
expect(result.stopReason).toBe('natural');
expect(waitForReady).toHaveBeenCalledWith(controller.signal);
expect((generateText as any).mock.calls[0][0].abortSignal).toBe(controller.signal);
});
it('returns metrics with stepCount, per-step boundaries, and aggregate token usage', async () => {
(generateText as any).mockImplementation(async (opts: any) => {
await opts.onStepFinish({});

View file

@ -9,6 +9,14 @@ async function* stream(messages: SDKMessage[]): AsyncGenerator<SDKMessage, void>
}
}
function deferred<T>() {
let resolve!: (value: T | PromiseLike<T>) => void;
const promise = new Promise<T>((innerResolve) => {
resolve = innerResolve;
});
return { promise, resolve };
}
function initMessage(overrides: Partial<Extract<SDKMessage, { type: 'system'; subtype: 'init' }>> = {}): Extract<
SDKMessage,
{ type: 'system'; subtype: 'init' }
@ -91,6 +99,247 @@ describe('ClaudeCodeKtxLlmRuntime', () => {
});
});
it('waits before Claude Code text generation and reports rate-limit events', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const query = vi.fn((_input: any) =>
stream([
{
type: 'rate_limit_event',
rate_limit_info: {
status: 'allowed_warning',
resetsAt: new Date(2_000).toISOString(),
rateLimitType: 'five_hour',
utilization: 0.91,
},
} as unknown as SDKMessage,
resultMessage({ result: 'ok' }),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok');
expect(waitForReady).toHaveBeenCalledTimes(1);
expect(report).toHaveBeenCalledWith({
provider: 'claude-subscription',
status: 'warning',
resetAtMs: 2_000,
rateLimitType: 'five_hour',
utilization: 0.91,
});
});
it('maps numeric Claude Code reset times from SDK rate-limit events', async () => {
const report = vi.fn();
const resetAtMs = 1_700_000_000_000;
const query = vi.fn((_input: any) =>
stream([
{
type: 'rate_limit_event',
rate_limit_info: {
status: 'rejected',
resetsAt: resetAtMs,
rateLimitType: 'five_hour',
utilization: 1,
},
} as unknown as SDKMessage,
resultMessage({ result: 'ok' }),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report, maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok');
expect(report).toHaveBeenCalledWith({
provider: 'claude-subscription',
status: 'rejected',
resetAtMs,
rateLimitType: 'five_hour',
utilization: 1,
});
});
it('retries a Claude Code query after an SDK rate-limit result error', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const resetAtMs = 1_700_000_000_000;
const query = vi
.fn()
.mockReturnValueOnce(
stream([
{
type: 'rate_limit_event',
rate_limit_info: {
status: 'rejected',
resetsAt: resetAtMs,
rateLimitType: 'five_hour',
utilization: 1,
},
} as unknown as SDKMessage,
resultMessage({
subtype: 'error_during_execution',
is_error: true,
result: '',
errors: ['rate limit retry budget exhausted'],
terminal_reason: 'model_error',
} as never),
]),
)
.mockReturnValueOnce(stream([resultMessage({ result: 'ok' })]));
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok');
expect(query).toHaveBeenCalledTimes(2);
expect(waitForReady).toHaveBeenCalledTimes(2);
expect(report).toHaveBeenCalledWith({
provider: 'claude-subscription',
status: 'rejected',
resetAtMs,
rateLimitType: 'five_hour',
utilization: 1,
});
});
it('reports Claude Code api retry messages as warning signals', async () => {
const report = vi.fn();
const query = vi.fn((_input: any) =>
stream([
{
type: 'system',
subtype: 'api_retry',
retry_delay_ms: 12_000,
} as unknown as SDKMessage,
resultMessage({ result: 'ok' }),
]),
);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report, maxRetryAttempts: () => 6 } as never,
});
await runtime.generateText({ role: 'default', prompt: 'hello' });
expect(report).toHaveBeenCalledWith({
provider: 'claude-subscription',
status: 'warning',
retryAfterMs: 12_000,
rateLimitType: 'api_retry',
});
});
it('passes abort signals into Claude Code governor waits', async () => {
const controller = new AbortController();
const waitForReady = vi.fn().mockResolvedValue(undefined);
const query = vi.fn((_input: any) => stream([resultMessage({ result: 'ok' })]));
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).resolves.toBe('ok');
expect(waitForReady).toHaveBeenCalledWith(controller.signal);
});
it('interrupts an active Claude Code query when the abort signal fires', async () => {
const controller = new AbortController();
const streamStarted = deferred<void>();
const releaseStream = deferred<void>();
const interrupt = vi.fn(() => releaseStream.resolve());
const queryResult = {
async *[Symbol.asyncIterator]() {
streamStarted.resolve();
await releaseStream.promise;
yield resultMessage({ result: 'ok' });
},
interrupt,
};
const query = vi.fn(() => queryResult as never);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
const pending = runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal });
await streamStarted.promise;
controller.abort();
await expect(pending).rejects.toThrow(/Aborted/);
expect(interrupt).toHaveBeenCalledTimes(1);
});
it('throws abort before starting Claude Code query when the signal is already aborted', async () => {
const controller = new AbortController();
controller.abort();
const query = vi.fn((_input: any) => stream([resultMessage({ result: 'ok' })]));
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).rejects.toThrow(/Aborted/);
expect(query).not.toHaveBeenCalled();
});
it('treats an interrupted Claude Code stream with no result as abort', async () => {
const controller = new AbortController();
const streamStarted = deferred<void>();
const releaseStream = deferred<void>();
const interrupt = vi.fn(() => releaseStream.resolve());
const queryResult = {
async *[Symbol.asyncIterator]() {
streamStarted.resolve();
await releaseStream.promise;
},
interrupt,
};
const query = vi.fn(() => queryResult as never);
const runtime = new ClaudeCodeKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'sonnet' },
query,
env: {},
rateLimitGovernor: { waitForReady: vi.fn().mockResolvedValue(undefined), report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
const pending = runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal });
await streamStarted.promise;
controller.abort();
await expect(pending).rejects.toThrow(/Aborted/);
expect(interrupt).toHaveBeenCalledTimes(1);
});
it('validates structured output with the caller schema and whitelists the SDK StructuredOutput tool', async () => {
const schema = z.object({ answer: z.string() });
const query = vi.fn((_input: any) =>

View file

@ -130,6 +130,150 @@ describe('CodexKtxLlmRuntime', () => {
).rejects.toThrow('Codex structured output failed validation');
});
it('reports Codex rate-limit failures and retries with opaque backoff', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const fakeRunner = {
runStreamed: vi
.fn()
.mockResolvedValueOnce(events([{ type: 'turn.failed', error: { message: '429 rate limit exceeded' } }]))
.mockResolvedValueOnce(
events([
{ type: 'turn.started' },
{ type: 'item.completed', item: { type: 'agent_message', text: 'ok' } },
{ type: 'turn.completed' },
]),
),
};
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok');
expect(report).toHaveBeenCalledWith({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' });
expect(waitForReady).toHaveBeenCalledTimes(2);
expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(2);
});
it('reports thrown Codex rate-limit failures and retries with opaque backoff', async () => {
const waitForReady = vi.fn().mockResolvedValue(undefined);
const report = vi.fn();
const fakeRunner = {
runStreamed: vi
.fn()
.mockRejectedValueOnce(new Error('ThreadError: 429 rate limit exceeded'))
.mockResolvedValueOnce(
events([
{ type: 'turn.started' },
{ type: 'item.completed', item: { type: 'agent_message', text: 'ok' } },
{ type: 'turn.completed' },
]),
),
};
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
rateLimitGovernor: { waitForReady, report, maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).resolves.toBe('ok');
expect(report).toHaveBeenCalledWith({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' });
expect(waitForReady).toHaveBeenCalledTimes(2);
expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(2);
});
it('surfaces Codex rate-limit failures without retrying when no governor is present', async () => {
const fakeRunner = runner([{ type: 'turn.failed', error: { message: '429 rate limit exceeded' } }]);
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello' })).rejects.toThrow(/rate limit/i);
expect(fakeRunner.runStreamed).toHaveBeenCalledTimes(1);
});
it('passes abort signals into Codex text generation and governor waits', async () => {
const controller = new AbortController();
const waitForReady = vi.fn().mockResolvedValue(undefined);
let observedSignal: AbortSignal | undefined;
const fakeRunner = {
runStreamed: vi.fn(async (input: { signal?: AbortSignal }) => {
observedSignal = input.signal;
return events([
{ type: 'turn.started' },
{ type: 'item.completed', item: { type: 'agent_message', text: 'ok' } },
{ type: 'turn.completed' },
]);
}),
};
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
rateLimitGovernor: { waitForReady, report: vi.fn(), maxRetryAttempts: () => 6 } as never,
});
await expect(runtime.generateText({ role: 'default', prompt: 'hello', abortSignal: controller.signal })).resolves.toBe('ok');
expect(waitForReady).toHaveBeenCalledWith(controller.signal);
expect(observedSignal).toBe(controller.signal);
});
it('links the parent abort signal into Codex agent-loop streamed runs', async () => {
const controller = new AbortController();
let releaseStream!: () => void;
const streamRelease = new Promise<void>((resolve) => {
releaseStream = resolve;
});
let markRunnerCalled!: () => void;
const runnerCalled = new Promise<void>((resolve) => {
markRunnerCalled = resolve;
});
let observedSignal: AbortSignal | undefined;
const fakeRunner = {
runStreamed: vi.fn(async (input: { signal?: AbortSignal }) => {
observedSignal = input.signal;
markRunnerCalled();
return (async function* () {
await streamRelease;
yield { type: 'turn.started' };
yield { type: 'item.completed', item: { type: 'agent_message', text: 'ok' } };
yield { type: 'turn.completed' };
})();
}),
};
const runtime = new CodexKtxLlmRuntime({
projectDir: '/tmp/project',
modelSlots: { default: 'codex' },
runner: fakeRunner,
});
const pending = runtime.runAgentLoop({
modelRole: 'default',
systemPrompt: '',
userPrompt: '',
toolSet: {},
stepBudget: 10,
telemetryTags: {},
abortSignal: controller.signal,
});
await runnerCalled;
expect(observedSignal).toBeDefined();
expect(observedSignal).not.toBe(controller.signal);
controller.abort();
expect(observedSignal?.aborted).toBe(true);
releaseStream();
await expect(pending).resolves.toMatchObject({ stopReason: 'natural' });
});
it('starts and closes a temporary MCP server for tool-backed agent loops', async () => {
const close = vi.fn(async () => undefined);
const startMcpServer = vi.fn(async () => ({

View file

@ -7,6 +7,7 @@ import {
import {
createLocalKtxEmbeddingProviderFromConfig,
createLocalKtxLlmProviderFromConfig,
createLocalKtxLlmRuntimeFromConfig,
resolveLocalKtxEmbeddingConfig,
resolveLocalKtxLlmConfig,
} from '../../../src/context/llm/local-config.js';
@ -129,6 +130,64 @@ describe('local KTX LLM config', () => {
vertexFallbackTo5m: false,
});
});
it('passes the rate-limit governor into created runtimes', () => {
const rateLimitGovernor = {} as never;
const createClaudeCodeRuntime = vi.fn(() => ({
generateText: vi.fn(),
generateObject: vi.fn(),
runAgentLoop: vi.fn(),
}));
const createCodexRuntime = vi.fn(() => ({
generateText: vi.fn(),
generateObject: vi.fn(),
runAgentLoop: vi.fn(),
}));
const createAiSdkRuntime = vi.fn(() => ({
generateText: vi.fn(),
generateObject: vi.fn(),
runAgentLoop: vi.fn(),
}));
const createKtxLlmProvider = vi.fn(() => ({
getModel: vi.fn(),
getModelByName: vi.fn(),
cacheMarker: vi.fn(),
repairToolCallHandler: vi.fn(),
thinkingProviderOptions: vi.fn(),
telemetryConfig: vi.fn(),
promptCachingConfig: vi.fn(),
activeBackend: vi.fn(() => 'anthropic'),
}));
createLocalKtxLlmRuntimeFromConfig(
{
provider: { backend: 'claude-code' },
models: { default: 'sonnet' },
promptCaching: undefined,
},
{ projectDir: '/tmp/project', env: {}, rateLimitGovernor, createClaudeCodeRuntime },
);
createLocalKtxLlmRuntimeFromConfig(
{
provider: { backend: 'codex' },
models: { default: 'codex' },
promptCaching: undefined,
},
{ projectDir: '/tmp/project', env: {}, rateLimitGovernor, createCodexRuntime },
);
createLocalKtxLlmRuntimeFromConfig(
{
provider: { backend: 'anthropic' },
models: { default: 'claude-sonnet-4-6' },
promptCaching: undefined,
},
{ env: {}, rateLimitGovernor, createAiSdkRuntime, createKtxLlmProvider: createKtxLlmProvider as never },
);
expect(createClaudeCodeRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor }));
expect(createCodexRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor }));
expect(createAiSdkRuntime).toHaveBeenCalledWith(expect.objectContaining({ rateLimitGovernor }));
});
});
describe('local KTX embedding config', () => {

View file

@ -0,0 +1,278 @@
import { describe, expect, it } from 'vitest';
import {
createRateLimitGovernorConfig,
RateLimitGovernor,
type RateLimitWaitState,
} from '../../../src/context/llm/rate-limit-governor.js';
function testClock(startMs = 1_000) {
let nowMs = startMs;
return {
now: () => nowMs,
advance: (ms: number) => {
nowMs += ms;
},
};
}
async function flushMicrotasks(turns = 10): Promise<void> {
for (let i = 0; i < turns; i += 1) {
await Promise.resolve();
}
}
describe('RateLimitGovernor', () => {
it('drops and restores the effective work-unit limit from warning signals', () => {
const clock = testClock();
const states: RateLimitWaitState[] = [];
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 6, minConcurrencyUnderPressure: 1 }),
{ now: clock.now, sleep: async () => undefined, random: () => 0 },
);
governor.subscribe((state) => states.push(state));
expect(governor.currentLimit()).toBe(6);
governor.report({
provider: 'claude-subscription',
status: 'warning',
utilization: 0.91,
rateLimitType: 'five_hour',
});
expect(governor.currentLimit()).toBe(1);
governor.report({
provider: 'claude-subscription',
status: 'allowed',
utilization: 0.2,
rateLimitType: 'five_hour',
});
expect(governor.currentLimit()).toBe(6);
expect(states.map((state) => state.kind)).toContain('concurrency_adjusted');
});
it('blocks work slots during a rejected reset window and emits wait states', async () => {
const clock = testClock();
const states: RateLimitWaitState[] = [];
const sleeps: number[] = [];
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 2, waitStateTickMs: 100 }),
{
now: clock.now,
random: () => 0,
sleep: async (ms) => {
sleeps.push(ms);
clock.advance(ms);
},
},
);
governor.subscribe((state) => states.push(state));
governor.report({ provider: 'anthropic-api', status: 'rejected', retryAfterMs: 250, rateLimitType: 'rpm' });
const release = await governor.acquireWorkSlot();
release();
expect(sleeps).toEqual([100, 100, 50]);
expect(states.some((state) => state.kind === 'wait_started' && state.provider === 'anthropic-api')).toBe(true);
expect(states.some((state) => state.kind === 'wait_finished' && state.provider === 'anthropic-api')).toBe(true);
});
it('rejects an interrupted wait without consuming a work slot', async () => {
const clock = testClock();
let abortListener: (() => void) | undefined;
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 1, waitStateTickMs: 100 }),
{
now: clock.now,
random: () => 0,
sleep: async (_ms, signal) =>
new Promise<void>((_resolve, reject) => {
abortListener = () => reject(new DOMException('Aborted', 'AbortError'));
signal?.addEventListener('abort', abortListener, { once: true });
}),
},
);
const controller = new AbortController();
governor.report({
provider: 'claude-subscription',
status: 'rejected',
resetAtMs: 2_000,
rateLimitType: 'five_hour',
});
const pending = governor.acquireWorkSlot(controller.signal);
controller.abort();
abortListener?.();
await expect(pending).rejects.toThrow(/Aborted/);
expect(governor.activeSlots()).toBe(0);
});
it('rejects an already-aborted ready wait', async () => {
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 1 }),
{ sleep: async () => undefined, random: () => 0 },
);
const controller = new AbortController();
controller.abort();
await expect(governor.waitForReady(controller.signal)).rejects.toThrow(/Aborted/);
});
it('rejects an already-aborted work slot without consuming capacity', async () => {
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 1 }),
{ sleep: async () => undefined, random: () => 0 },
);
const controller = new AbortController();
controller.abort();
await expect(governor.acquireWorkSlot(controller.signal)).rejects.toThrow(/Aborted/);
expect(governor.activeSlots()).toBe(0);
});
it('uses bounded opaque backoff for rejected signals without reset hints', async () => {
const clock = testClock();
const sleeps: number[] = [];
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({
maxConcurrency: 1,
retry: { maxAttempts: 3, baseDelayMs: 1_000, maxDelayMs: 60_000, jitter: false },
}),
{
now: clock.now,
random: () => 0,
sleep: async (ms) => {
sleeps.push(ms);
clock.advance(ms);
},
},
);
governor.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' });
const release1 = await governor.acquireWorkSlot();
release1();
governor.report({ provider: 'codex', status: 'rejected', rateLimitType: 'opaque' });
const release2 = await governor.acquireWorkSlot();
release2();
expect(sleeps).toEqual([1_000, 2_000]);
});
it('exposes the configured retry budget and disables outer retries when pacing is off', () => {
const retry = { maxAttempts: 3, baseDelayMs: 1_000, maxDelayMs: 60_000, jitter: false };
const enabled = new RateLimitGovernor(createRateLimitGovernorConfig({ retry }));
expect(enabled.maxRetryAttempts()).toBe(3);
const disabled = new RateLimitGovernor(createRateLimitGovernorConfig({ enabled: false, retry }));
expect(disabled.maxRetryAttempts()).toBe(1);
});
it('emits visible wait ticks after a rejected report without a waiting caller', async () => {
const clock = testClock();
const states: RateLimitWaitState[] = [];
const sleeps: number[] = [];
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 4, minConcurrencyUnderPressure: 1, waitStateTickMs: 100 }),
{
now: clock.now,
random: () => 0,
sleep: async (ms, signal) => {
if (signal?.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
sleeps.push(ms);
clock.advance(ms);
},
},
);
governor.subscribe((state) => states.push(state));
governor.report({
provider: 'claude-subscription',
status: 'rejected',
resetAtMs: 1_250,
rateLimitType: 'five_hour',
});
await flushMicrotasks();
expect(sleeps).toEqual([100, 100, 50]);
expect(states).toContainEqual(
expect.objectContaining({
kind: 'wait_started',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
remainingMs: 250,
}),
);
expect(states.filter((state) => state.kind === 'wait_tick')).toHaveLength(3);
expect(states).toContainEqual(
expect.objectContaining({
kind: 'wait_finished',
provider: 'claude-subscription',
rateLimitType: 'five_hour',
remainingMs: 0,
}),
);
});
it('does not duplicate countdown sleeps when a work slot waits during the same pause', async () => {
const clock = testClock();
const states: RateLimitWaitState[] = [];
const sleeps: number[] = [];
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 2, waitStateTickMs: 100 }),
{
now: clock.now,
random: () => 0,
sleep: async (ms, signal) => {
if (signal?.aborted) {
throw new DOMException('Aborted', 'AbortError');
}
sleeps.push(ms);
clock.advance(ms);
},
},
);
governor.subscribe((state) => states.push(state));
governor.report({ provider: 'anthropic-api', status: 'rejected', retryAfterMs: 250, rateLimitType: 'rpm' });
const pendingRelease = governor.acquireWorkSlot();
await flushMicrotasks();
const release = await pendingRelease;
release();
expect(sleeps).toEqual([100, 100, 50]);
expect(states.filter((state) => state.kind === 'wait_tick')).toHaveLength(3);
expect(governor.activeSlots()).toBe(0);
});
it('stops the visible wait ticker when the last subscriber unsubscribes', async () => {
const clock = testClock();
let abortCount = 0;
const governor = new RateLimitGovernor(
createRateLimitGovernorConfig({ maxConcurrency: 1, waitStateTickMs: 100 }),
{
now: clock.now,
random: () => 0,
sleep: async (_ms, signal) =>
new Promise<void>((_resolve, reject) => {
signal?.addEventListener(
'abort',
() => {
abortCount += 1;
reject(new DOMException('Aborted', 'AbortError'));
},
{ once: true },
);
}),
},
);
const unsubscribe = governor.subscribe(() => undefined);
governor.report({ provider: 'claude-subscription', status: 'rejected', retryAfterMs: 1_000 });
await flushMicrotasks(1);
unsubscribe();
await flushMicrotasks(1);
expect(abortCount).toBe(1);
});
});

View file

@ -50,6 +50,17 @@ connections:
maxConcurrency: 1,
failureMode: 'continue',
},
rateLimit: {
enabled: true,
throttleThreshold: 0.8,
minConcurrencyUnderPressure: 1,
retry: {
maxAttempts: 6,
baseDelayMs: 1_000,
maxDelayMs: 60_000,
jitter: true,
},
},
profile: false,
},
agent: {
@ -163,6 +174,52 @@ ingest:
expect(parseKtxProjectConfig('ingest:\n profile: json\n').ingest.profile).toBe('json');
});
it('defaults ingest rate-limit settings', () => {
const config = buildDefaultKtxProjectConfig();
expect(config.ingest.rateLimit).toEqual({
enabled: true,
throttleThreshold: 0.8,
minConcurrencyUnderPressure: 1,
retry: {
maxAttempts: 6,
baseDelayMs: 1_000,
maxDelayMs: 60_000,
jitter: true,
},
});
});
it('validates ingest rate-limit retry settings', () => {
const config = parseKtxProjectConfig(`
llm:
provider:
backend: none
ingest:
rateLimit:
enabled: true
throttleThreshold: 0.7
minConcurrencyUnderPressure: 2
maxWaitMs: 300000
retry:
maxAttempts: 4
baseDelayMs: 500
maxDelayMs: 30000
jitter: false
`);
expect(config.ingest.rateLimit).toEqual({
enabled: true,
throttleThreshold: 0.7,
minConcurrencyUnderPressure: 2,
maxWaitMs: 300_000,
retry: {
maxAttempts: 4,
baseDelayMs: 500,
maxDelayMs: 30_000,
jitter: false,
},
});
});
it('parses global Vertex LLM config', () => {
const config = parseKtxProjectConfig(`
llm:

View file

@ -34,6 +34,17 @@ describe('buildProjectStackSnapshotFields', () => {
adapters: [],
embeddings: { backend: 'sentence-transformers', dimensions: 384 },
workUnits: { stepBudget: 40, maxConcurrency: 1, failureMode: 'continue' },
rateLimit: {
enabled: true,
throttleThreshold: 0.8,
minConcurrencyUnderPressure: 1,
retry: {
maxAttempts: 6,
baseDelayMs: 1_000,
maxDelayMs: 60_000,
jitter: true,
},
},
profile: false,
},
llm: { provider: { backend: 'none' }, models: {}, promptCaching: {} },