feat: add codex llm backend

This commit is contained in:
Andrey Avtomonov 2026-06-01 17:22:24 +02:00
parent 21744fc520
commit 64b8a416fe
28 changed files with 1462 additions and 14 deletions

View file

@ -0,0 +1,144 @@
import type { LlmTokenUsage, RunLoopStopReason } from './runtime-port.js';
export interface CodexExecEventSummary {
finalText: string;
stopReason: RunLoopStopReason;
usage: LlmTokenUsage;
stepCount: number;
stepBoundariesMs: number[];
toolCallCount: number;
toolFailures: string[];
error?: Error;
}
interface CodexEventParseOptions {
startedAt?: number;
now?: () => number;
}
function record(value: unknown): Record<string, unknown> | undefined {
return value && typeof value === 'object' ? (value as Record<string, unknown>) : undefined;
}
function text(value: unknown): string | undefined {
return typeof value === 'string' && value.trim().length > 0 ? value : undefined;
}
function numberValue(value: unknown): number | undefined {
return typeof value === 'number' && Number.isFinite(value) ? value : undefined;
}
function usageFrom(value: unknown): LlmTokenUsage {
const usage = record(value);
if (!usage) {
return {};
}
const inputTokens = numberValue(usage.input_tokens ?? usage.inputTokens);
const outputTokens = numberValue(usage.output_tokens ?? usage.outputTokens);
const totalTokens = numberValue(usage.total_tokens ?? usage.totalTokens);
return {
...(inputTokens !== undefined ? { inputTokens } : {}),
...(outputTokens !== undefined ? { outputTokens } : {}),
...(totalTokens !== undefined ? { totalTokens } : {}),
};
}
function stopReasonFrom(value: unknown): RunLoopStopReason {
const reason = text(value)?.toLowerCase();
if (reason && /(budget|max_turn|max-turn|limit)/.test(reason)) {
return 'budget';
}
return 'natural';
}
function errorMessageFrom(value: unknown): string {
if (value instanceof Error) {
return value.message;
}
const asRecord = record(value);
const message = text(asRecord?.message);
return message ?? text(value) ?? 'Codex turn failed';
}
/** @internal */
export function parseCodexExecEventLine(line: string): unknown {
try {
return JSON.parse(line) as unknown;
} catch (error) {
throw new Error(`Codex JSONL event stream was malformed: ${error instanceof Error ? error.message : String(error)}`);
}
}
export function summarizeCodexExecEvents(
events: Iterable<unknown>,
options: CodexEventParseOptions = {},
): CodexExecEventSummary {
const startedAt = options.startedAt ?? Date.now();
const now = options.now ?? Date.now;
let finalText = '';
let stopReason: RunLoopStopReason = 'natural';
let usage: LlmTokenUsage = {};
let stepCount = 0;
const stepBoundariesMs: number[] = [];
let toolCallCount = 0;
const toolFailures: string[] = [];
let error: Error | undefined;
for (const event of events) {
const eventRecord = record(event);
const eventType = text(eventRecord?.type);
if (!eventRecord || !eventType) {
continue;
}
if (eventType === 'turn.started') {
stepCount += 1;
continue;
}
if (eventType === 'turn.completed') {
usage = usageFrom(eventRecord.usage);
stepBoundariesMs.push(now() - startedAt);
stopReason = stopReasonFrom(eventRecord.reason ?? eventRecord.stop_reason ?? eventRecord.terminal_reason);
continue;
}
if (eventType === 'turn.failed' || eventType === 'error') {
stopReason = 'error';
error = new Error(errorMessageFrom(eventRecord.error ?? eventRecord.message));
continue;
}
const item = record(eventRecord.item);
const itemType = text(item?.type);
if (!item || !itemType) {
continue;
}
if (eventType === 'item.completed' && itemType === 'agent_message') {
finalText = text(item.text) ?? finalText;
continue;
}
if (eventType === 'item.started' && itemType === 'mcp_tool_call') {
toolCallCount += 1;
continue;
}
if (eventType === 'item.completed' && itemType === 'mcp_tool_call' && item.error !== undefined) {
const name = text(item.name) ?? text(item.tool_name) ?? 'unknown';
toolFailures.push(`${name}: ${errorMessageFrom(item.error)}`);
}
}
return {
finalText,
stopReason,
usage,
stepCount,
stepBoundariesMs,
toolCallCount,
toolFailures,
...(error ? { error } : {}),
};
}

View file

@ -0,0 +1,87 @@
import { randomBytes } from 'node:crypto';
import type { Server } from 'node:http';
import { McpServer } from '@modelcontextprotocol/sdk/server/mcp.js';
import type { KtxMcpServerLike } from '../mcp/types.js';
import { runKtxMcpHttpServer, type KtxMcpHttpServerHandle } from '../../mcp-http-server.js';
import type { KtxRuntimeToolSet } from './runtime-port.js';
import { normalizeKtxRuntimeToolOutput } from './runtime-tools.js';
/** @internal */
export interface CreateCodexRuntimeMcpServerInput {
server?: KtxMcpServerLike;
toolSet: KtxRuntimeToolSet;
}
export interface CodexRuntimeMcpServerHandle {
url: string;
bearerTokenEnvVar: 'KTX_CODEX_RUNTIME_MCP_TOKEN';
bearerToken: string;
close(): Promise<void>;
}
type RunServer = typeof runKtxMcpHttpServer;
export interface StartCodexRuntimeMcpServerInput {
projectDir: string;
toolSet: KtxRuntimeToolSet;
runServer?: RunServer;
}
/** @internal */
export function createCodexRuntimeMcpServer(input: CreateCodexRuntimeMcpServerInput): KtxMcpServerLike {
const server =
input.server ??
(new McpServer({
name: 'ktx-runtime',
version: '0.0.0',
}) as KtxMcpServerLike);
for (const descriptor of Object.values(input.toolSet)) {
server.registerTool(
descriptor.name,
{
description: descriptor.description,
inputSchema: descriptor.inputSchema.shape,
},
async (toolInput) => {
const normalized = normalizeKtxRuntimeToolOutput(await descriptor.execute(toolInput));
return {
content: [{ type: 'text', text: normalized.markdown }],
...(normalized.structured !== undefined && normalized.structured !== null && typeof normalized.structured === 'object'
? { structuredContent: normalized.structured as object }
: {}),
};
},
);
}
return server;
}
function serverPort(server: Server, fallback: number): number {
const address = server.address();
return typeof address === 'object' && address ? address.port : fallback;
}
export async function startCodexRuntimeMcpServer(
input: StartCodexRuntimeMcpServerInput,
): Promise<CodexRuntimeMcpServerHandle> {
const bearerToken = randomBytes(32).toString('hex');
const runServer = input.runServer ?? runKtxMcpHttpServer;
const handle = (await runServer({
projectDir: input.projectDir,
host: '127.0.0.1',
port: 0,
token: bearerToken,
allowedHosts: ['127.0.0.1', 'localhost'],
allowedOrigins: [],
createMcpServer: () => createCodexRuntimeMcpServer({ toolSet: input.toolSet }) as McpServer,
})) as KtxMcpHttpServerHandle;
const port = serverPort(handle.server, 0);
return {
url: `http://127.0.0.1:${port}/mcp`,
bearerTokenEnvVar: 'KTX_CODEX_RUNTIME_MCP_TOKEN',
bearerToken,
close: () => handle.close(),
};
}

View file

@ -0,0 +1,20 @@
const DEFAULT_CODEX_MODEL = 'gpt-5.3-codex';
const CODEX_MODEL_ALIASES: Record<string, string> = {
codex: DEFAULT_CODEX_MODEL,
default: DEFAULT_CODEX_MODEL,
};
const EXPLICIT_CODEX_MODEL_ID = /^(?:gpt|codex)-[a-z0-9][a-z0-9._-]*$/i;
export function resolveCodexModel(model: string): string {
const normalized = model.trim();
const alias = CODEX_MODEL_ALIASES[normalized];
if (alias) {
return alias;
}
if (EXPLICIT_CODEX_MODEL_ID.test(normalized)) {
return normalized;
}
throw new Error(`Unsupported Codex model "${model}". Use codex, default, or a gpt-* / codex-* model id.`);
}

View file

@ -0,0 +1,41 @@
interface CodexRuntimeMcpConfig {
url: string;
bearerTokenEnvVar: string;
bearerToken: string;
toolNames: string[];
}
export interface BuildCodexRuntimeConfigInput {
model: string;
mcp?: CodexRuntimeMcpConfig;
}
export interface CodexRuntimeConfig {
configOverrides: Record<string, unknown>;
env: NodeJS.ProcessEnv;
}
export function buildCodexRuntimeConfig(input: BuildCodexRuntimeConfigInput): CodexRuntimeConfig {
const configOverrides: Record<string, unknown> = {
model: input.model,
approval_policy: 'never',
sandbox_mode: 'read-only',
web_search: 'disabled',
history: { persistence: 'none' },
};
const env: NodeJS.ProcessEnv = {};
if (input.mcp) {
configOverrides.mcp_servers = {
ktx: {
url: input.mcp.url,
bearer_token_env_var: input.mcp.bearerTokenEnvVar,
enabled_tools: input.mcp.toolNames,
required: true,
},
};
env[input.mcp.bearerTokenEnvVar] = input.mcp.bearerToken;
}
return { configOverrides, env };
}

View file

@ -0,0 +1,264 @@
import { z } from 'zod';
import { noopLogger, type KtxLogger } from '../core/config.js';
import { summarizeCodexExecEvents, type CodexExecEventSummary } from './codex-exec-events.js';
import {
startCodexRuntimeMcpServer,
type CodexRuntimeMcpServerHandle,
} from './codex-mcp-runtime-server.js';
import { resolveCodexModel } from './codex-models.js';
import { buildCodexRuntimeConfig } from './codex-runtime-config.js';
import { CodexSdkCliRunner, type CodexSdkRunner } from './codex-sdk-runner.js';
import type {
KtxGenerateObjectInput,
KtxGenerateTextInput,
KtxLlmRuntimePort,
KtxRuntimeToolSet,
LlmTokenUsage,
RunLoopParams,
RunLoopResult,
} from './runtime-port.js';
export interface CodexKtxLlmRuntimeDeps {
projectDir: string;
modelSlots: { default: string } & Partial<Record<string, string>>;
runner?: CodexSdkRunner;
startMcpServer?: (input: { projectDir: string; toolSet: KtxRuntimeToolSet }) => Promise<CodexRuntimeMcpServerHandle>;
logger?: KtxLogger;
}
function modelForRole(modelSlots: CodexKtxLlmRuntimeDeps['modelSlots'], role: string): string {
return resolveCodexModel(modelSlots[role] ?? modelSlots.default);
}
function promptWithSystem(system: string | undefined, prompt: string): string {
return [system, prompt].filter(Boolean).join('\n\n');
}
async function collectEvents(events: AsyncIterable<unknown>): Promise<unknown[]> {
const collected: unknown[] = [];
for await (const event of events) {
collected.push(event);
}
return collected;
}
function metrics(summary: CodexExecEventSummary, startedAt: number): { totalMs: number; usage: LlmTokenUsage } {
return { totalMs: Date.now() - startedAt, usage: summary.usage };
}
function assertSuccessfulText(summary: CodexExecEventSummary): string {
if (summary.error) {
throw summary.error;
}
if (!summary.finalText.trim()) {
throw new Error('Codex completed without an agent message');
}
return summary.finalText;
}
function parseStructuredOutput<TOutput, TSchema extends z.ZodType<TOutput>>(schema: TSchema, text: string): TOutput {
try {
return schema.parse(JSON.parse(text));
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
throw new Error(`Codex structured output failed validation: ${message}`);
}
}
async function mcpForTools(input: {
projectDir: string;
toolSet?: KtxRuntimeToolSet;
startMcpServer: CodexKtxLlmRuntimeDeps['startMcpServer'];
}): Promise<CodexRuntimeMcpServerHandle | undefined> {
if (!input.toolSet || Object.keys(input.toolSet).length === 0) {
return undefined;
}
return (input.startMcpServer ?? startCodexRuntimeMcpServer)({
projectDir: input.projectDir,
toolSet: input.toolSet,
});
}
export class CodexKtxLlmRuntime implements KtxLlmRuntimePort {
private readonly runner: CodexSdkRunner;
private readonly logger: KtxLogger;
constructor(private readonly deps: CodexKtxLlmRuntimeDeps) {
this.runner = deps.runner ?? new CodexSdkCliRunner();
this.logger = deps.logger ?? noopLogger;
}
async generateText(input: KtxGenerateTextInput): Promise<string> {
const startedAt = Date.now();
const model = modelForRole(this.deps.modelSlots, input.role);
const mcp = await mcpForTools({
projectDir: this.deps.projectDir,
toolSet: input.tools,
startMcpServer: this.deps.startMcpServer,
});
try {
const config = buildCodexRuntimeConfig({
model,
...(mcp
? {
mcp: {
url: mcp.url,
bearerTokenEnvVar: mcp.bearerTokenEnvVar,
bearerToken: mcp.bearerToken,
toolNames: Object.keys(input.tools ?? {}),
},
}
: {}),
});
const events = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
}),
);
const summary = summarizeCodexExecEvents(events, { startedAt });
input.onMetrics?.(metrics(summary, startedAt));
return assertSuccessfulText(summary);
} finally {
await mcp?.close();
}
}
async generateObject<TOutput, TSchema extends z.ZodType<TOutput>>(
input: KtxGenerateObjectInput<TOutput, TSchema>,
): Promise<TOutput> {
const startedAt = Date.now();
const model = modelForRole(this.deps.modelSlots, input.role);
const mcp = await mcpForTools({
projectDir: this.deps.projectDir,
toolSet: input.tools,
startMcpServer: this.deps.startMcpServer,
});
try {
const config = buildCodexRuntimeConfig({
model,
...(mcp
? {
mcp: {
url: mcp.url,
bearerTokenEnvVar: mcp.bearerTokenEnvVar,
bearerToken: mcp.bearerToken,
toolNames: Object.keys(input.tools ?? {}),
},
}
: {}),
});
const events = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(input.system, input.prompt),
configOverrides: config.configOverrides,
env: config.env,
outputSchema: z.toJSONSchema(input.schema, { target: 'draft-7' }) as Record<string, unknown>,
}),
);
const summary = summarizeCodexExecEvents(events, { startedAt });
input.onMetrics?.(metrics(summary, startedAt));
return parseStructuredOutput(input.schema, assertSuccessfulText(summary));
} finally {
await mcp?.close();
}
}
async runAgentLoop(params: RunLoopParams): Promise<RunLoopResult> {
const startedAt = Date.now();
const model = modelForRole(this.deps.modelSlots, params.modelRole);
let mcp: CodexRuntimeMcpServerHandle | undefined;
try {
mcp = await mcpForTools({
projectDir: this.deps.projectDir,
toolSet: params.toolSet,
startMcpServer: this.deps.startMcpServer,
});
const config = buildCodexRuntimeConfig({
model,
...(mcp
? {
mcp: {
url: mcp.url,
bearerTokenEnvVar: mcp.bearerTokenEnvVar,
bearerToken: mcp.bearerToken,
toolNames: Object.keys(params.toolSet),
},
}
: {}),
});
const events = await collectEvents(
await this.runner.runStreamed({
projectDir: this.deps.projectDir,
model,
prompt: promptWithSystem(params.systemPrompt, params.userPrompt),
configOverrides: config.configOverrides,
env: config.env,
}),
);
const summary = summarizeCodexExecEvents(events, { startedAt });
for (let index = 1; index <= summary.stepCount; index += 1) {
try {
await params.onStepFinish?.({ stepIndex: index, stepBudget: params.stepBudget });
} catch (error) {
this.logger.warn(
`[codex-runner] onStepFinish callback threw; ignoring: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
return {
stopReason: summary.stopReason,
...(summary.stopReason === 'error' && summary.error ? { error: summary.error } : {}),
metrics: {
totalMs: Date.now() - startedAt,
usage: summary.usage,
stepCount: summary.stepCount,
stepBoundariesMs: summary.stepBoundariesMs,
},
};
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
return {
stopReason: 'error',
error: err,
metrics: { totalMs: Date.now() - startedAt, usage: {}, stepCount: 0, stepBoundariesMs: [] },
};
} finally {
await mcp?.close();
}
}
}
export async function runCodexAuthProbe(input: {
projectDir: string;
model: string;
runner?: CodexSdkRunner;
}): Promise<{ ok: true } | { ok: false; message: string }> {
let model: string;
try {
model = resolveCodexModel(input.model);
} catch (error) {
return { ok: false, message: error instanceof Error ? error.message : String(error) };
}
const runtime = new CodexKtxLlmRuntime({
projectDir: input.projectDir,
modelSlots: { default: model },
...(input.runner ? { runner: input.runner } : {}),
});
try {
await runtime.generateText({ role: 'default', prompt: 'Reply with exactly: ok' });
return { ok: true };
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
return {
ok: false,
message: `Codex authentication is not usable. Authenticate Codex locally with the Codex CLI, verify the Codex CLI is installed, then rerun setup or the command. ${message}`,
};
}
}

View file

@ -0,0 +1,74 @@
import { Codex } from '@openai/codex-sdk';
export interface CodexSdkRunnerInput {
projectDir: string;
model: string;
prompt: string;
configOverrides?: Record<string, unknown>;
env?: NodeJS.ProcessEnv;
outputSchema?: Record<string, unknown>;
}
export interface CodexSdkRunner {
runStreamed(input: CodexSdkRunnerInput): Promise<AsyncIterable<unknown>>;
}
type CodexThread = {
runStreamed(input: string, turnOptions?: { outputSchema?: Record<string, unknown> }): Promise<{ events: AsyncIterable<unknown> }>;
};
type CodexClient = {
startThread(options: { workingDirectory: string; skipGitRepoCheck: true }): CodexThread;
};
type CodexConstructor = new (options?: { config?: Record<string, unknown> }) => CodexClient;
function applyRunnerEnv(env: NodeJS.ProcessEnv | undefined): () => void {
if (!env) {
return () => undefined;
}
const previous = new Map<string, string | undefined>();
for (const [key, value] of Object.entries(env)) {
previous.set(key, process.env[key]);
if (value === undefined) {
delete process.env[key];
} else {
process.env[key] = value;
}
}
return () => {
for (const [key, value] of previous) {
if (value === undefined) {
delete process.env[key];
} else {
process.env[key] = value;
}
}
};
}
export class CodexSdkCliRunner implements CodexSdkRunner {
async runStreamed(input: CodexSdkRunnerInput): Promise<AsyncIterable<unknown>> {
const restoreEnv = applyRunnerEnv(input.env);
try {
const CodexClass = Codex as CodexConstructor;
const codex = new CodexClass({
config: {
...(input.configOverrides ?? {}),
model: input.model,
},
});
const thread = codex.startThread({
workingDirectory: input.projectDir,
skipGitRepoCheck: true,
});
const streamed = await thread.runStreamed(
input.prompt,
input.outputSchema ? { outputSchema: input.outputSchema } : undefined,
);
return streamed.events;
} finally {
restoreEnv();
}
}
}

View file

@ -5,6 +5,7 @@ import { resolveKtxConfigReference } from '../core/config-reference.js';
import type { KtxProjectEmbeddingConfig, KtxProjectLlmConfig } from '../project/config.js';
import { AiSdkKtxLlmRuntime } from './ai-sdk-runtime.js';
import { ClaudeCodeKtxLlmRuntime } from './claude-code-runtime.js';
import { CodexKtxLlmRuntime } from './codex-runtime.js';
import type { KtxLlmRuntimePort } from './runtime-port.js';
interface LocalConfigDeps {
@ -13,6 +14,7 @@ interface LocalConfigDeps {
createKtxLlmProvider?: typeof createKtxLlmProvider;
createKtxEmbeddingProvider?: typeof createKtxEmbeddingProvider;
createClaudeCodeRuntime?: (deps: ConstructorParameters<typeof ClaudeCodeKtxLlmRuntime>[0]) => KtxLlmRuntimePort;
createCodexRuntime?: (deps: ConstructorParameters<typeof CodexKtxLlmRuntime>[0]) => KtxLlmRuntimePort;
createAiSdkRuntime?: (deps: { llmProvider: KtxLlmProvider }) => KtxLlmRuntimePort;
}
@ -104,7 +106,7 @@ export function createLocalKtxLlmProviderFromConfig(
deps: LocalConfigDeps = {},
): KtxLlmProvider | null {
const resolved = resolveLocalKtxLlmConfig(config, deps.env ?? process.env);
if (!resolved || resolved.backend === 'claude-code') {
if (!resolved || resolved.backend === 'claude-code' || resolved.backend === 'codex') {
return null;
}
return (deps.createKtxLlmProvider ?? createKtxLlmProvider)(resolved);
@ -129,6 +131,16 @@ export function createLocalKtxLlmRuntimeFromConfig(
env: deps.env,
});
}
if (resolved.backend === 'codex') {
const projectDir = deps.projectDir;
if (!projectDir) {
throw new Error('projectDir is required when creating the codex LLM runtime');
}
return (deps.createCodexRuntime ?? ((runtimeDeps) => new CodexKtxLlmRuntime(runtimeDeps)))({
projectDir,
modelSlots: resolved.modelSlots,
});
}
const llmProvider = (deps.createKtxLlmProvider ?? createKtxLlmProvider)(resolved);
return (deps.createAiSdkRuntime ?? ((runtimeDeps) => new AiSdkKtxLlmRuntime(runtimeDeps)))({ llmProvider });
}