import { z } from 'zod'; import { noopLogger, type KtxLogger } from '../core/config.js'; import { isCompletedAgentStep, summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js'; import { startCodexRuntimeMcpServer, type CodexRuntimeMcpServerHandle, } from './codex-mcp-runtime-server.js'; import { resolveCodexModel } from './codex-models.js'; import { buildCodexRuntimeConfig } from './codex-runtime-config.js'; import { CodexSdkCliRunner, type CodexSdkRunner } from './codex-sdk-runner.js'; import type { KtxGenerateObjectInput, KtxGenerateTextInput, KtxLlmRuntimePort, KtxRuntimeToolSet, LlmTokenUsage, RunLoopParams, RunLoopResult, } from './runtime-port.js'; export interface CodexKtxLlmRuntimeDeps { projectDir: string; modelSlots: { default: string } & Partial>; runner?: CodexSdkRunner; startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise; logger?: KtxLogger; } function modelForRole(modelSlots: CodexKtxLlmRuntimeDeps['modelSlots'], role: string): string { return resolveCodexModel(modelSlots[role] ?? modelSlots.default); } function promptWithSystem(system: string | undefined, prompt: string): string { return [system, prompt].filter(Boolean).join('\n\n'); } interface CollectCodexEventsOptions { stepBudget?: number; abortController?: AbortController; onStep?: (stepIndex: number) => void | Promise; } interface CollectCodexEventsResult { events: unknown[]; budgetExceeded: boolean; streamError?: Error; } function eventRecord(value: unknown): Record | undefined { return value && typeof value === 'object' ? (value as Record) : undefined; } function isTurnCompleted(event: unknown): boolean { return eventRecord(event)?.type === 'turn.completed'; } /** * Drains the Codex stream once, emitting a step as each agent action completes * so callers see live progress and the step budget is enforced mid-run. Every * completed agent-action item counts (see {@link isCompletedAgentStep}), so * built-in `command_execution` steps decrement the budget the same as * `mcp_tool_call`s. A turn that produced no actions still counts as one step, * matching the metrics summary and the AI SDK backend. */ async function collectEvents( events: AsyncIterable, options: CollectCodexEventsOptions = {}, ): Promise { const collected: unknown[] = []; let completedSteps = 0; let sawActionStep = false; let budgetExceeded = false; let streamError: Error | undefined; // The SDK yields every stdout event, then throws on a non-zero codex exec // exit. Catch that throw so the events already collected (which carry the // real `turn.failed`/`error` reason) survive for the summary; the masked // exit message is kept only as a fallback when no error event was emitted. try { for await (const event of events) { collected.push(event); const isActionStep = isCompletedAgentStep(event); if (isActionStep) { sawActionStep = true; } else if (sawActionStep || !isTurnCompleted(event)) { // Only fall back to counting a bare turn as a step when the turn produced // no agent actions; a completed turn is terminal, so it never aborts. continue; } completedSteps += 1; await options.onStep?.(completedSteps); if (isActionStep && options.stepBudget !== undefined && completedSteps >= options.stepBudget) { budgetExceeded = true; options.abortController?.abort(); break; } } } catch (error) { streamError = error instanceof Error ? error : new Error(String(error)); } return { events: collected, budgetExceeded, ...(streamError ? { streamError } : {}) }; } function metrics(summary: CodexExecEventSummary, startedAt: number): { totalMs: number; usage: LlmTokenUsage } { return { totalMs: Date.now() - startedAt, usage: summary.usage }; } function summaryError(summary: CodexExecEventSummary, streamError?: Error): Error | undefined { // A `turn.failed`/`error` event carries the real reason; prefer it over the // SDK's generic non-zero-exit throw. Fall back to the stream error only when // no event explained the failure (e.g. spawn failure or auth before a turn). if (summary.error) { return summary.error; } if (summary.toolFailures.length > 0) { return new Error(`Codex runtime tool call failed: ${summary.toolFailures.join('; ')}`); } return streamError; } function assertSuccessfulText(summary: CodexExecEventSummary, streamError?: Error): string { const error = summaryError(summary, streamError); if (error) { throw error; } if (!summary.finalText.trim()) { throw new Error('Codex completed without an agent message'); } return summary.finalText; } function parseStructuredOutput>(schema: TSchema, text: string): TOutput { try { return schema.parse(JSON.parse(text)); } catch (error) { const message = error instanceof Error ? error.message : String(error); throw new Error(`Codex structured output failed validation: ${message}`); } } async function mcpForTools(input: { projectDir: string; toolSet?: KtxRuntimeToolSet; startMcpServer: CodexKtxLlmRuntimeDeps['startMcpServer']; }): Promise { if (!input.toolSet || Object.keys(input.toolSet).length === 0) { return undefined; } return (input.startMcpServer ?? startCodexRuntimeMcpServer)({ projectDir: input.projectDir, toolSet: input.toolSet, }); } function runtimeToolNames(toolSet: KtxRuntimeToolSet | undefined): string[] { return Object.values(toolSet ?? {}).map((descriptor) => descriptor.name); } export class CodexKtxLlmRuntime implements KtxLlmRuntimePort { private readonly runner: CodexSdkRunner; private readonly logger: KtxLogger; constructor(private readonly deps: CodexKtxLlmRuntimeDeps) { this.runner = deps.runner ?? new CodexSdkCliRunner(); this.logger = deps.logger ?? noopLogger; } async generateText(input: KtxGenerateTextInput): Promise { const startedAt = Date.now(); const model = modelForRole(this.deps.modelSlots, input.role); const mcp = await mcpForTools({ projectDir: this.deps.projectDir, toolSet: input.tools, startMcpServer: this.deps.startMcpServer, }); try { const config = buildCodexRuntimeConfig({ model, ...(mcp ? { mcp: { url: mcp.url, bearerTokenEnvVar: mcp.bearerTokenEnvVar, bearerToken: mcp.bearerToken, toolNames: runtimeToolNames(input.tools), }, } : {}), }); const collected = await collectEvents( await this.runner.runStreamed({ projectDir: this.deps.projectDir, model, prompt: promptWithSystem(input.system, input.prompt), configOverrides: config.configOverrides, env: config.env, }), ); const summary = summarizeCodexExecEvents(collected.events, { startedAt }); input.onMetrics?.(metrics(summary, startedAt)); return assertSuccessfulText(summary, collected.streamError); } finally { await mcp?.close(); } } async generateObject>( input: KtxGenerateObjectInput, ): Promise { const startedAt = Date.now(); const model = modelForRole(this.deps.modelSlots, input.role); const mcp = await mcpForTools({ projectDir: this.deps.projectDir, toolSet: input.tools, startMcpServer: this.deps.startMcpServer, }); try { const config = buildCodexRuntimeConfig({ model, ...(mcp ? { mcp: { url: mcp.url, bearerTokenEnvVar: mcp.bearerTokenEnvVar, bearerToken: mcp.bearerToken, toolNames: runtimeToolNames(input.tools), }, } : {}), }); const collected = await collectEvents( await this.runner.runStreamed({ projectDir: this.deps.projectDir, model, prompt: promptWithSystem(input.system, input.prompt), configOverrides: config.configOverrides, env: config.env, outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record, }), ); const summary = summarizeCodexExecEvents(collected.events, { startedAt }); input.onMetrics?.(metrics(summary, startedAt)); return parseStructuredOutput(input.schema, assertSuccessfulText(summary, collected.streamError)); } finally { await mcp?.close(); } } async runAgentLoop(params: RunLoopParams): Promise { const startedAt = Date.now(); const model = modelForRole(this.deps.modelSlots, params.modelRole); let mcp: CodexRuntimeMcpServerHandle | undefined; try { mcp = await mcpForTools({ projectDir: this.deps.projectDir, toolSet: params.toolSet, startMcpServer: this.deps.startMcpServer, }); const config = buildCodexRuntimeConfig({ model, ...(mcp ? { mcp: { url: mcp.url, bearerTokenEnvVar: mcp.bearerTokenEnvVar, bearerToken: mcp.bearerToken, toolNames: runtimeToolNames(params.toolSet), }, } : {}), }); const abortController = new AbortController(); const onStep = async (stepIndex: number): Promise => { try { await params.onStepFinish?.({ stepIndex, stepBudget: params.stepBudget }); } catch (error) { this.logger.warn( `[codex-runner] onStepFinish callback threw; ignoring: ${error instanceof Error ? error.message : String(error)}`, ); } }; const collected = await collectEvents( await this.runner.runStreamed({ projectDir: this.deps.projectDir, model, prompt: promptWithSystem(params.systemPrompt, params.userPrompt), configOverrides: config.configOverrides, env: config.env, signal: abortController.signal, }), { stepBudget: params.stepBudget, abortController, onStep }, ); const summary = summarizeCodexExecEvents(collected.events, { startedAt }); const error = summaryError(summary, collected.streamError); const stopReason = collected.budgetExceeded ? 'budget' : error ? 'error' : summary.stopReason; return { stopReason, ...(stopReason === 'error' && error ? { error } : {}), metrics: { totalMs: Date.now() - startedAt, usage: summary.usage, stepCount: summary.stepCount, stepBoundariesMs: summary.stepBoundariesMs, }, }; } catch (error) { const err = error instanceof Error ? error : new Error(String(error)); return { stopReason: 'error', error: err, metrics: { totalMs: Date.now() - startedAt, usage: {}, stepCount: 0, stepBoundariesMs: [] }, }; } finally { await mcp?.close(); } } } // A rejected model is not an auth failure: Codex authenticated, connected, and // the API refused the model id. These markers come from the API error envelope // (e.g. "model is not supported", "invalid_request_error"). const MODEL_UNAVAILABLE_MARKERS = /\bnot supported\b|\bnot available\b|\bdoes not exist\b|invalid_request_error|\bunknown model\b|\bunsupported model\b/i; function describeCodexProbeFailure(model: string, message: string): { message: string; fix: string } { if (MODEL_UNAVAILABLE_MARKERS.test(message)) { const fix = `Run \`codex\` to see the models your account supports, then set llm.models.default in ktx.yaml (or rerun \`ktx setup\`).`; return { message: `Codex is authenticated, but the configured model "${model}" is not available for this Codex account. ${fix} Details: ${message}`, fix, }; } const fix = `Authenticate Codex locally with the Codex CLI, verify the Codex CLI is installed, then rerun setup or \`ktx status\`.`; return { message: `Codex authentication is not usable. ${fix} Details: ${message}`, fix, }; } export async function runCodexAuthProbe(input: { projectDir: string; model: string; runner?: CodexSdkRunner; }): Promise<{ ok: true } | { ok: false; message: string; fix: string }> { let model: string; try { model = resolveCodexModel(input.model); } catch (error) { return { ok: false, message: error instanceof Error ? error.message : String(error), fix: 'Set llm.models.default in ktx.yaml to a supported codex model (codex, default, or a gpt-* / codex-* id), or rerun `ktx setup`.', }; } const runtime = new CodexKtxLlmRuntime({ projectDir: input.projectDir, modelSlots: { default: model }, ...(input.runner ? { runner: input.runner } : {}), }); try { await runtime.generateText({ role: 'default', prompt: 'Reply with exactly: ok' }); return { ok: true }; } catch (error) { const message = error instanceof Error ? error.message : String(error); return { ok: false, ...describeCodexProbeFailure(model, message) }; } }