From 66100976590bff14c1a4b66f6a090c950a0aad3d Mon Sep 17 00:00:00 2001 From: Musa Date: Tue, 10 Mar 2026 20:54:14 -0700 Subject: [PATCH] Support for Codex via Plano (#808) * Add Codex CLI support; xAI response improvements * Add native Plano running check and update CLI agent error handling * adding PR suggestions for transformations and code quality * message extraction logic in ResponsesAPIRequest * xAI support for Responses API by routing to native endpoint + refactor code --- cli/planoai/core.py | 129 +++- cli/planoai/main.py | 47 +- cli/uv.lock | 2 +- crates/brightstaff/src/handlers/llm.rs | 8 +- crates/brightstaff/src/state/mod.rs | 99 +++ crates/hermesllm/src/apis/openai.rs | 2 +- crates/hermesllm/src/apis/openai_responses.rs | 199 +++++- .../responses_api_streaming_buffer.rs | 66 +- crates/hermesllm/src/clients/endpoints.rs | 19 +- crates/hermesllm/src/providers/id.rs | 21 +- crates/hermesllm/src/providers/request.rs | 76 +++ .../src/transforms/request/from_openai.rs | 645 +++++++++++++++--- .../response_streaming/to_openai_streaming.rs | 17 +- crates/llm_gateway/src/stream_context.rs | 3 +- demos/README.md | 1 + demos/llm_routing/codex_router/README.md | 92 +++ demos/llm_routing/codex_router/config.yaml | 38 ++ .../codex_router/pretty_model_resolution.sh | 33 + 18 files changed, 1297 insertions(+), 200 deletions(-) create mode 100644 demos/llm_routing/codex_router/README.md create mode 100644 demos/llm_routing/codex_router/config.yaml create mode 100644 demos/llm_routing/codex_router/pretty_model_resolution.sh diff --git a/cli/planoai/core.py b/cli/planoai/core.py index e9ddc7bd..174f37c0 100644 --- a/cli/planoai/core.py +++ b/cli/planoai/core.py @@ -10,7 +10,6 @@ from planoai.consts import ( PLANO_DOCKER_IMAGE, PLANO_DOCKER_NAME, ) -import subprocess from planoai.docker_cli import ( docker_container_status, docker_remove_container, @@ -147,26 +146,48 @@ def stop_docker_container(service=PLANO_DOCKER_NAME): log.info(f"Failed to shut down services: {str(e)}") -def start_cli_agent(plano_config_file=None, settings_json="{}"): - """Start a CLI client connected to Plano.""" - - with open(plano_config_file, "r") as file: - plano_config = file.read() - plano_config_yaml = yaml.safe_load(plano_config) - - # Get egress listener configuration - egress_config = plano_config_yaml.get("listeners", {}).get("egress_traffic", {}) - host = egress_config.get("host", "127.0.0.1") - port = egress_config.get("port", 12000) - - # Parse additional settings from command line +def _parse_cli_agent_settings(settings_json: str) -> dict: try: - additional_settings = json.loads(settings_json) if settings_json else {} + return json.loads(settings_json) if settings_json else {} except json.JSONDecodeError: log.error("Settings must be valid JSON") sys.exit(1) - # Set up environment variables + +def _resolve_cli_agent_endpoint(plano_config_yaml: dict) -> tuple[str, int]: + listeners = plano_config_yaml.get("listeners") + + if isinstance(listeners, dict): + egress_config = listeners.get("egress_traffic", {}) + host = egress_config.get("host") or egress_config.get("address") or "0.0.0.0" + port = egress_config.get("port", 12000) + return host, port + + if isinstance(listeners, list): + for listener in listeners: + if listener.get("type") in ["model", "model_listener"]: + host = listener.get("host") or listener.get("address") or "0.0.0.0" + port = listener.get("port", 12000) + return host, port + + return "0.0.0.0", 12000 + + +def _apply_non_interactive_env(env: dict, additional_settings: dict) -> None: + if additional_settings.get("NON_INTERACTIVE_MODE", False): + env.update( + { + "CI": "true", + "FORCE_COLOR": "0", + "NODE_NO_READLINE": "1", + "TERM": "dumb", + } + ) + + +def _start_claude_cli_agent( + host: str, port: int, plano_config_yaml: dict, additional_settings: dict +) -> None: env = os.environ.copy() env.update( { @@ -186,7 +207,6 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): "ANTHROPIC_SMALL_FAST_MODEL" ] else: - # Check if arch.claude.code.small.fast alias exists in model_aliases model_aliases = plano_config_yaml.get("model_aliases", {}) if "arch.claude.code.small.fast" in model_aliases: env["ANTHROPIC_SMALL_FAST_MODEL"] = "arch.claude.code.small.fast" @@ -196,23 +216,10 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): ) log.info("Or provide ANTHROPIC_SMALL_FAST_MODEL in --settings JSON") - # Non-interactive mode configuration from additional_settings only - if additional_settings.get("NON_INTERACTIVE_MODE", False): - env.update( - { - "CI": "true", - "FORCE_COLOR": "0", - "NODE_NO_READLINE": "1", - "TERM": "dumb", - } - ) + _apply_non_interactive_env(env, additional_settings) - # Build claude command arguments claude_args = [] - - # Add settings if provided, excluding those already handled as environment variables if additional_settings: - # Filter out settings that are already processed as environment variables claude_settings = { k: v for k, v in additional_settings.items() @@ -221,10 +228,8 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): if claude_settings: claude_args.append(f"--settings={json.dumps(claude_settings)}") - # Use claude from PATH claude_path = "claude" log.info(f"Connecting Claude Code Agent to Plano at {host}:{port}") - try: subprocess.run([claude_path] + claude_args, env=env, check=True) except subprocess.CalledProcessError as e: @@ -235,3 +240,61 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): f"{claude_path} not found. Make sure Claude Code is installed: npm install -g @anthropic-ai/claude-code" ) sys.exit(1) + + +def _start_codex_cli_agent(host: str, port: int, additional_settings: dict) -> None: + env = os.environ.copy() + env.update( + { + "OPENAI_API_KEY": "test", # Use test token for plano + "OPENAI_BASE_URL": f"http://{host}:{port}/v1", + "NO_PROXY": host, + "DISABLE_TELEMETRY": "true", + } + ) + _apply_non_interactive_env(env, additional_settings) + + codex_model = additional_settings.get("CODEX_MODEL", "gpt-5.3-codex") + codex_path = "codex" + codex_args = ["--model", codex_model] + + log.info( + f"Connecting Codex CLI Agent to Plano at {host}:{port} (default model: {codex_model})" + ) + try: + subprocess.run([codex_path] + codex_args, env=env, check=True) + except subprocess.CalledProcessError as e: + log.error(f"Error starting codex: {e}") + sys.exit(1) + except FileNotFoundError: + log.error( + f"{codex_path} not found. Make sure Codex CLI is installed: npm install -g @openai/codex" + ) + sys.exit(1) + + +def start_cli_agent( + plano_config_file=None, cli_agent_type="claude", settings_json="{}" +): + """Start a CLI client connected to Plano.""" + + with open(plano_config_file, "r") as file: + plano_config = file.read() + plano_config_yaml = yaml.safe_load(plano_config) + + host, port = _resolve_cli_agent_endpoint(plano_config_yaml) + + additional_settings = _parse_cli_agent_settings(settings_json) + + if cli_agent_type == "claude": + _start_claude_cli_agent(host, port, plano_config_yaml, additional_settings) + return + + if cli_agent_type == "codex": + _start_codex_cli_agent(host, port, additional_settings) + return + + log.error( + f"Unsupported cli agent type '{cli_agent_type}'. Supported values: claude, codex" + ) + sys.exit(1) diff --git a/cli/planoai/main.py b/cli/planoai/main.py index a63f294e..e9cdc0a0 100644 --- a/cli/planoai/main.py +++ b/cli/planoai/main.py @@ -1,3 +1,4 @@ +import json import os import multiprocessing import subprocess @@ -31,6 +32,7 @@ from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_backgroun from planoai.consts import ( DEFAULT_OTEL_TRACING_GRPC_ENDPOINT, DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT, + NATIVE_PID_FILE, PLANO_DOCKER_IMAGE, PLANO_DOCKER_NAME, ) @@ -40,6 +42,30 @@ from planoai.versioning import check_version_status, get_latest_version, get_ver log = getLogger(__name__) +def _is_native_plano_running() -> bool: + if not os.path.exists(NATIVE_PID_FILE): + return False + try: + with open(NATIVE_PID_FILE, "r") as f: + pids = json.load(f) + except (OSError, json.JSONDecodeError): + return False + + envoy_pid = pids.get("envoy_pid") + brightstaff_pid = pids.get("brightstaff_pid") + if not isinstance(envoy_pid, int) or not isinstance(brightstaff_pid, int): + return False + + for pid in (envoy_pid, brightstaff_pid): + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + continue + return True + + def _is_port_in_use(port: int) -> bool: """Check if a TCP port is already bound on localhost.""" import socket @@ -523,7 +549,7 @@ def logs(debug, follow, docker): @click.command() -@click.argument("type", type=click.Choice(["claude"]), required=True) +@click.argument("type", type=click.Choice(["claude", "codex"]), required=True) @click.argument("file", required=False) # Optional file argument @click.option( "--path", default=".", help="Path to the directory containing plano_config.yaml" @@ -536,14 +562,19 @@ def logs(debug, follow, docker): def cli_agent(type, file, path, settings): """Start a CLI agent connected to Plano. - CLI_AGENT: The type of CLI agent to start (currently only 'claude' is supported) + CLI_AGENT: The type of CLI agent to start ('claude' or 'codex') """ - # Check if plano docker container is running - plano_status = docker_container_status(PLANO_DOCKER_NAME) - if plano_status != "running": - log.error(f"plano docker container is not running (status: {plano_status})") - log.error("Please start plano using the 'planoai up' command.") + native_running = _is_native_plano_running() + docker_running = False + if not native_running: + docker_running = docker_container_status(PLANO_DOCKER_NAME) == "running" + + if not (native_running or docker_running): + log.error("Plano is not running.") + log.error( + "Start Plano first using 'planoai up ' (native or --docker mode)." + ) sys.exit(1) # Determine plano_config.yaml path @@ -553,7 +584,7 @@ def cli_agent(type, file, path, settings): sys.exit(1) try: - start_cli_agent(plano_config_file, settings) + start_cli_agent(plano_config_file, type, settings) except SystemExit: # Re-raise SystemExit to preserve exit codes raise diff --git a/cli/uv.lock b/cli/uv.lock index 45ccf82e..9d85bf85 100644 --- a/cli/uv.lock +++ b/cli/uv.lock @@ -337,7 +337,7 @@ wheels = [ [[package]] name = "planoai" -version = "0.4.7" +version = "0.4.9" source = { editable = "." } dependencies = [ { name = "click" }, diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index b03d4d29..67afebff 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -198,6 +198,7 @@ async fn llm_chat_inner( let temperature = client_request.get_temperature(); let is_streaming_request = client_request.is_streaming(); let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases); + let (provider_id, _) = get_provider_info(&llm_providers, &alias_resolved_model).await; // Validate that the requested model exists in configuration // This matches the validation in llm_gateway routing.rs @@ -249,7 +250,11 @@ async fn llm_chat_inner( if client_request.remove_metadata_key("plano_preference_config") { debug!("removed plano_preference_config from metadata"); } - + if let Some(ref client_api_kind) = client_api { + let upstream_api = + provider_id.compatible_api_for_client(client_api_kind, is_streaming_request); + client_request.normalize_for_upstream(provider_id, &upstream_api); + } // === v1/responses state management: Determine upstream API and combine input if needed === // Do this BEFORE routing since routing consumes the request // Only process state if state_storage is configured @@ -496,7 +501,6 @@ async fn llm_chat_inner( .into_response()), } } - /// Resolves model aliases by looking up the requested model in the model_aliases map. /// Returns the target model if an alias is found, otherwise returns the original model. fn resolve_model_alias( diff --git a/crates/brightstaff/src/state/mod.rs b/crates/brightstaff/src/state/mod.rs index ce3ec8ae..3d59f359 100644 --- a/crates/brightstaff/src/state/mod.rs +++ b/crates/brightstaff/src/state/mod.rs @@ -130,6 +130,7 @@ pub fn extract_input_items(input: &InputParam) -> Vec { }]), })] } + InputParam::SingleItem(item) => vec![item.clone()], InputParam::Items(items) => items.clone(), } } @@ -146,3 +147,101 @@ pub async fn retrieve_and_combine_input( let combined_input = storage.merge(&prev_state, current_input); Ok(combined_input) } + +#[cfg(test)] +mod tests { + use super::extract_input_items; + use hermesllm::apis::openai_responses::{ + InputContent, InputItem, InputMessage, InputParam, MessageContent, MessageRole, + }; + + #[test] + fn test_extract_input_items_converts_text_to_user_message_item() { + let extracted = extract_input_items(&InputParam::Text("hello world".to_string())); + assert_eq!(extracted.len(), 1); + + let InputItem::Message(message) = &extracted[0] else { + panic!("expected InputItem::Message"); + }; + assert!(matches!(message.role, MessageRole::User)); + + let MessageContent::Items(items) = &message.content else { + panic!("expected MessageContent::Items"); + }; + assert_eq!(items.len(), 1); + + let InputContent::InputText { text } = &items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(text, "hello world"); + } + + #[test] + fn test_extract_input_items_preserves_single_item() { + let item = InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![InputContent::InputText { + text: "assistant note".to_string(), + }]), + }); + + let extracted = extract_input_items(&InputParam::SingleItem(item.clone())); + assert_eq!(extracted.len(), 1); + let InputItem::Message(message) = &extracted[0] else { + panic!("expected InputItem::Message"); + }; + assert!(matches!(message.role, MessageRole::Assistant)); + let MessageContent::Items(items) = &message.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text } = &items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(text, "assistant note"); + } + + #[test] + fn test_extract_input_items_preserves_items_list() { + let items = vec![ + InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![InputContent::InputText { + text: "first".to_string(), + }]), + }), + InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![InputContent::InputText { + text: "second".to_string(), + }]), + }), + ]; + + let extracted = extract_input_items(&InputParam::Items(items.clone())); + assert_eq!(extracted.len(), items.len()); + + let InputItem::Message(first) = &extracted[0] else { + panic!("expected first item to be message"); + }; + assert!(matches!(first.role, MessageRole::User)); + let MessageContent::Items(first_items) = &first.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text: first_text } = &first_items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(first_text, "first"); + + let InputItem::Message(second) = &extracted[1] else { + panic!("expected second item to be message"); + }; + assert!(matches!(second.role, MessageRole::Assistant)); + let MessageContent::Items(second_items) = &second.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text: second_text } = &second_items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(second_text, "second"); + } +} diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index 53eee442..33f55b29 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -108,7 +108,7 @@ pub struct ChatCompletionsRequest { pub top_p: Option, pub top_logprobs: Option, pub user: Option, - // pub web_search: Option, // GOOD FIRST ISSUE: Future support for web search + pub web_search_options: Option, // VLLM-specific parameters (used by Arch-Function) pub top_k: Option, diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 65f4dfa0..eac8a452 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -116,6 +116,8 @@ pub enum InputParam { Text(String), /// Array of input items (messages, references, outputs, etc.) Items(Vec), + /// Single input item (some clients send object instead of array) + SingleItem(InputItem), } /// Input item - can be a message, item reference, function call output, etc. @@ -130,12 +132,20 @@ pub enum InputItem { item_type: String, id: String, }, + /// Function call emitted by model in prior turn + FunctionCall { + #[serde(rename = "type")] + item_type: String, + name: String, + arguments: String, + call_id: String, + }, /// Function call output FunctionCallOutput { #[serde(rename = "type")] item_type: String, call_id: String, - output: String, + output: serde_json::Value, }, } @@ -166,6 +176,7 @@ pub enum MessageRole { Assistant, System, Developer, + Tool, } /// Input content types @@ -173,6 +184,7 @@ pub enum MessageRole { #[serde(tag = "type", rename_all = "snake_case")] pub enum InputContent { /// Text input + #[serde(rename = "input_text", alias = "text", alias = "output_text")] InputText { text: String }, /// Image input via URL InputImage { @@ -180,6 +192,7 @@ pub enum InputContent { detail: Option, }, /// File input via URL + #[serde(rename = "input_file", alias = "file")] InputFile { file_url: String }, /// Audio input InputAudio { @@ -207,10 +220,11 @@ pub struct AudioConfig { } /// Text configuration +#[skip_serializing_none] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TextConfig { /// Text format configuration - pub format: TextFormat, + pub format: Option, } /// Text format @@ -285,6 +299,7 @@ pub enum Tool { filters: Option, }, /// Web search tool + #[serde(rename = "web_search", alias = "web_search_preview")] WebSearchPreview { domains: Option>, search_context_size: Option, @@ -298,6 +313,12 @@ pub enum Tool { display_height_px: Option, display_number: Option, }, + /// Custom tool (provider/SDK-specific tool contract) + Custom { + name: Option, + description: Option, + format: Option, + }, } /// Ranking options for file search @@ -1015,6 +1036,30 @@ pub struct ListInputItemsResponse { // ProviderRequest Implementation // ============================================================================ +fn append_input_content_text(buffer: &mut String, content: &InputContent) { + match content { + InputContent::InputText { text } => buffer.push_str(text), + InputContent::InputImage { .. } => buffer.push_str("[Image]"), + InputContent::InputFile { .. } => buffer.push_str("[File]"), + InputContent::InputAudio { .. } => buffer.push_str("[Audio]"), + } +} + +fn append_content_items_text(buffer: &mut String, content_items: &[InputContent]) { + for content in content_items { + // Preserve existing behavior: each content item is prefixed with a space. + buffer.push(' '); + append_input_content_text(buffer, content); + } +} + +fn append_message_content_text(buffer: &mut String, content: &MessageContent) { + match content { + MessageContent::Text(text) => buffer.push_str(text), + MessageContent::Items(content_items) => append_content_items_text(buffer, content_items), + } +} + impl ProviderRequest for ResponsesAPIRequest { fn model(&self) -> &str { &self.model @@ -1031,36 +1076,27 @@ impl ProviderRequest for ResponsesAPIRequest { fn extract_messages_text(&self) -> String { match &self.input { InputParam::Text(text) => text.clone(), - InputParam::Items(items) => { - items.iter().fold(String::new(), |acc, item| { - match item { - InputItem::Message(msg) => { - let content_text = match &msg.content { - MessageContent::Text(text) => text.clone(), - MessageContent::Items(content_items) => { - content_items.iter().fold(String::new(), |acc, content| { - acc + " " - + &match content { - InputContent::InputText { text } => text.clone(), - InputContent::InputImage { .. } => { - "[Image]".to_string() - } - InputContent::InputFile { .. } => { - "[File]".to_string() - } - InputContent::InputAudio { .. } => { - "[Audio]".to_string() - } - } - }) - } - }; - acc + " " + &content_text - } - // Skip non-message items (references, outputs, etc.) - _ => acc, + InputParam::SingleItem(item) => { + // Normalize single-item input for extraction behavior parity. + match item { + InputItem::Message(msg) => { + let mut extracted = String::new(); + append_message_content_text(&mut extracted, &msg.content); + extracted } - }) + _ => String::new(), + } + } + InputParam::Items(items) => { + let mut extracted = String::new(); + for item in items { + if let InputItem::Message(msg) = item { + // Preserve existing behavior: each message is prefixed with a space. + extracted.push(' '); + append_message_content_text(&mut extracted, &msg.content); + } + } + extracted } } } @@ -1068,6 +1104,20 @@ impl ProviderRequest for ResponsesAPIRequest { fn get_recent_user_message(&self) -> Option { match &self.input { InputParam::Text(text) => Some(text.clone()), + InputParam::SingleItem(item) => match item { + InputItem::Message(msg) if matches!(msg.role, MessageRole::User) => { + match &msg.content { + MessageContent::Text(text) => Some(text.clone()), + MessageContent::Items(content_items) => { + content_items.iter().find_map(|content| match content { + InputContent::InputText { text } => Some(text.clone()), + _ => None, + }) + } + } + } + _ => None, + }, InputParam::Items(items) => { items.iter().rev().find_map(|item| { match item { @@ -1097,6 +1147,9 @@ impl ProviderRequest for ResponsesAPIRequest { .iter() .filter_map(|tool| match tool { Tool::Function { name, .. } => Some(name.clone()), + Tool::Custom { + name: Some(name), .. + } => Some(name.clone()), // Other tool types don't have user-defined names _ => None, }) @@ -1366,6 +1419,7 @@ impl crate::providers::streaming_response::ProviderStreamResponse for ResponsesA #[cfg(test)] mod tests { use super::*; + use serde_json::json; #[test] fn test_response_output_text_delta_deserialization() { @@ -1506,4 +1560,87 @@ mod tests { _ => panic!("Expected ResponseCompleted event"), } } + + #[test] + fn test_request_deserializes_custom_tool() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "apply the patch", + "tools": [ + { + "type": "custom", + "name": "run_patch", + "description": "Apply patch text", + "format": { + "kind": "patch", + "version": "v1" + } + } + ] + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + let tools = parsed.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + + match &tools[0] { + Tool::Custom { + name, + description, + format, + } => { + assert_eq!(name.as_deref(), Some("run_patch")); + assert_eq!(description.as_deref(), Some("Apply patch text")); + assert!(format.is_some()); + } + _ => panic!("expected custom tool"), + } + } + + #[test] + fn test_request_deserializes_web_search_tool_alias() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "find repository info", + "tools": [ + { + "type": "web_search", + "domains": ["github.com"], + "search_context_size": "medium" + } + ] + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + let tools = parsed.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + + match &tools[0] { + Tool::WebSearchPreview { + domains, + search_context_size, + .. + } => { + assert_eq!(domains.as_ref().map(Vec::len), Some(1)); + assert_eq!(search_context_size.as_deref(), Some("medium")); + } + _ => panic!("expected web search preview tool"), + } + } + + #[test] + fn test_request_deserializes_text_config_without_format() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "hello", + "text": {} + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + assert!(parsed.text.is_some()); + assert!(parsed.text.unwrap().format.is_none()); + } } diff --git a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs index 2aeb34ac..92589ccf 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs @@ -74,6 +74,7 @@ pub struct ResponsesAPIStreamBuffer { /// Lifecycle state flags created_emitted: bool, in_progress_emitted: bool, + finalized: bool, /// Track which output items we've added output_items_added: HashMap, // output_index -> item_id @@ -109,6 +110,7 @@ impl ResponsesAPIStreamBuffer { upstream_response_metadata: None, created_emitted: false, in_progress_emitted: false, + finalized: false, output_items_added: HashMap::new(), text_content: HashMap::new(), function_arguments: HashMap::new(), @@ -236,7 +238,7 @@ impl ResponsesAPIStreamBuffer { }), store: Some(true), text: Some(TextConfig { - format: TextFormat::Text, + format: Some(TextFormat::Text), }), audio: None, modalities: None, @@ -255,8 +257,38 @@ impl ResponsesAPIStreamBuffer { /// Finalize the response by emitting all *.done events and response.completed. /// Call this when the stream is complete (after seeing [DONE] or end_of_stream). pub fn finalize(&mut self) { + // Idempotent finalize: avoid duplicate response.completed loops. + if self.finalized { + return; + } + self.finalized = true; + let mut events = Vec::new(); + // Ensure lifecycle prelude is emitted even if finalize is triggered + // by finish_reason before any prior delta was processed. + if !self.created_emitted { + if self.response_id.is_none() { + self.response_id = Some(format!( + "resp_{}", + uuid::Uuid::new_v4().to_string().replace("-", "") + )); + self.created_at = Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); + self.model = Some("unknown".to_string()); + } + events.push(self.create_response_created_event()); + self.created_emitted = true; + } + if !self.in_progress_emitted { + events.push(self.create_response_in_progress_event()); + self.in_progress_emitted = true; + } + // Emit done events for all accumulated content // Text content done events @@ -443,6 +475,12 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer { } }; + // Explicit completion marker from transform layer. + if matches!(stream_event.as_ref(), ResponsesAPIStreamEvent::Done { .. }) { + self.finalize(); + return; + } + let mut events = Vec::new(); // Capture upstream metadata from ResponseCreated or ResponseInProgress if present @@ -789,4 +827,30 @@ mod tests { println!("✓ NO completion events (partial stream, no [DONE])"); println!("✓ Arguments accumulated: '{{\"location\":\"'\n"); } + + #[test] + fn test_finish_reason_without_done_still_finalizes_once() { + let raw_input = r#"data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"},"finish_reason":null}]} + +data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#; + + let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + let stream_iter = SseStreamIter::try_from(raw_input.as_bytes()).unwrap(); + let mut buffer = ResponsesAPIStreamBuffer::new(); + + for raw_event in stream_iter { + let transformed_event = + SseEvent::try_from((raw_event, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(transformed_event); + } + + let output = String::from_utf8_lossy(&buffer.to_bytes()).to_string(); + let completed_count = output.matches("event: response.completed").count(); + assert_eq!( + completed_count, 1, + "response.completed should be emitted exactly once" + ); + } } diff --git a/crates/hermesllm/src/clients/endpoints.rs b/crates/hermesllm/src/clients/endpoints.rs index eff96cc5..23e14604 100644 --- a/crates/hermesllm/src/clients/endpoints.rs +++ b/crates/hermesllm/src/clients/endpoints.rs @@ -184,8 +184,8 @@ impl SupportedAPIsFromClient { SupportedAPIsFromClient::OpenAIResponsesAPI(_) => { // For Responses API, check if provider supports it, otherwise translate to chat/completions match provider_id { - // OpenAI and compatible providers that support /v1/responses - ProviderId::OpenAI => route_by_provider("/responses"), + // Providers that support /v1/responses natively + ProviderId::OpenAI | ProviderId::XAI => route_by_provider("/responses"), // All other providers: translate to /chat/completions _ => route_by_provider("/chat/completions"), } @@ -654,4 +654,19 @@ mod tests { "/custom/azure/path/gpt-4-deployment/chat/completions?api-version=2025-01-01-preview" ); } + + #[test] + fn test_responses_api_targets_xai_native_responses_endpoint() { + let api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + assert_eq!( + api.target_endpoint_for_provider( + &ProviderId::XAI, + "/v1/responses", + "grok-4-1-fast-reasoning", + false, + None + ), + "/v1/responses" + ); + } } diff --git a/crates/hermesllm/src/providers/id.rs b/crates/hermesllm/src/providers/id.rs index fff73f15..11008711 100644 --- a/crates/hermesllm/src/providers/id.rs +++ b/crates/hermesllm/src/providers/id.rs @@ -166,10 +166,11 @@ impl ProviderId { SupportedAPIsFromClient::OpenAIChatCompletions(_), ) => SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), - // OpenAI Responses API - only OpenAI supports this - (ProviderId::OpenAI, SupportedAPIsFromClient::OpenAIResponsesAPI(_)) => { - SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses) - } + // OpenAI Responses API - OpenAI and xAI support this natively + ( + ProviderId::OpenAI | ProviderId::XAI, + SupportedAPIsFromClient::OpenAIResponsesAPI(_), + ) => SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses), // Amazon Bedrock natively supports Bedrock APIs (ProviderId::AmazonBedrock, SupportedAPIsFromClient::OpenAIChatCompletions(_)) => { @@ -328,4 +329,16 @@ mod tests { "AmazonBedrock should have models (mapped to amazon)" ); } + + #[test] + fn test_xai_uses_responses_api_for_responses_clients() { + use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; + + let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + let upstream = ProviderId::XAI.compatible_api_for_client(&client_api, false); + assert!(matches!( + upstream, + SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses) + )); + } } diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index e97e8a68..92688133 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -5,6 +5,7 @@ use crate::apis::amazon_bedrock::{ConverseRequest, ConverseStreamRequest}; use crate::apis::openai_responses::ResponsesAPIRequest; use crate::clients::endpoints::SupportedAPIsFromClient; use crate::clients::endpoints::SupportedUpstreamAPIs; +use crate::ProviderId; use serde_json::Value; use std::collections::HashMap; @@ -70,6 +71,25 @@ impl ProviderRequestType { Self::ResponsesAPIRequest(r) => r.set_messages(messages), } } + + /// Apply provider-specific request normalization before sending upstream. + pub fn normalize_for_upstream( + &mut self, + provider_id: ProviderId, + upstream_api: &SupportedUpstreamAPIs, + ) { + if provider_id == ProviderId::XAI + && matches!( + upstream_api, + SupportedUpstreamAPIs::OpenAIChatCompletions(_) + ) + { + if let Self::ChatCompletionsRequest(req) = self { + // xAI's legacy live-search shape is deprecated on chat/completions. + req.web_search_options = None; + } + } + } } impl ProviderRequest for ProviderRequestType { @@ -787,6 +807,62 @@ mod tests { } } + #[test] + fn test_normalize_for_upstream_xai_clears_chat_web_search_options() { + use crate::apis::openai::{Message, MessageContent, OpenAIApi, Role}; + + let mut request = ProviderRequestType::ChatCompletionsRequest(ChatCompletionsRequest { + model: "grok-4".to_string(), + messages: vec![Message { + role: Role::User, + content: Some(MessageContent::Text("hello".to_string())), + name: None, + tool_calls: None, + tool_call_id: None, + }], + web_search_options: Some(serde_json::json!({"search_context_size":"medium"})), + ..Default::default() + }); + + request.normalize_for_upstream( + ProviderId::XAI, + &SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), + ); + + let ProviderRequestType::ChatCompletionsRequest(req) = request else { + panic!("expected chat request"); + }; + assert!(req.web_search_options.is_none()); + } + + #[test] + fn test_normalize_for_upstream_non_xai_keeps_chat_web_search_options() { + use crate::apis::openai::{Message, MessageContent, OpenAIApi, Role}; + + let mut request = ProviderRequestType::ChatCompletionsRequest(ChatCompletionsRequest { + model: "gpt-4o".to_string(), + messages: vec![Message { + role: Role::User, + content: Some(MessageContent::Text("hello".to_string())), + name: None, + tool_calls: None, + tool_call_id: None, + }], + web_search_options: Some(serde_json::json!({"search_context_size":"medium"})), + ..Default::default() + }); + + request.normalize_for_upstream( + ProviderId::OpenAI, + &SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), + ); + + let ProviderRequestType::ChatCompletionsRequest(req) = request else { + panic!("expected chat request"); + }; + assert!(req.web_search_options.is_some()); + } + #[test] fn test_responses_api_to_anthropic_messages_conversion() { use crate::apis::anthropic::AnthropicApi::Messages; diff --git a/crates/hermesllm/src/transforms/request/from_openai.rs b/crates/hermesllm/src/transforms/request/from_openai.rs index ddc3b1ca..f2e8ab0d 100644 --- a/crates/hermesllm/src/transforms/request/from_openai.rs +++ b/crates/hermesllm/src/transforms/request/from_openai.rs @@ -10,7 +10,8 @@ use crate::apis::anthropic::{ ToolResultContent, }; use crate::apis::openai::{ - ChatCompletionsRequest, Message, MessageContent, Role, Tool, ToolChoice, ToolChoiceType, + ChatCompletionsRequest, FunctionCall as OpenAIFunctionCall, Message, MessageContent, Role, + Tool, ToolCall as OpenAIToolCall, ToolChoice, ToolChoiceType, }; use crate::apis::openai_responses::{ @@ -65,6 +66,14 @@ impl TryFrom for Vec { Ok(messages) } + InputParam::SingleItem(item) => { + // Some clients send a single object instead of an array. + let nested = ResponsesInputConverter { + input: InputParam::Items(vec![item]), + instructions: converter.instructions, + }; + Vec::::try_from(nested) + } InputParam::Items(items) => { // Convert input items to messages let mut converted_messages = Vec::new(); @@ -82,82 +91,145 @@ impl TryFrom for Vec { // Convert each input item for item in items { - if let InputItem::Message(input_msg) = item { - let role = match input_msg.role { - MessageRole::User => Role::User, - MessageRole::Assistant => Role::Assistant, - MessageRole::System => Role::System, - MessageRole::Developer => Role::System, // Map developer to system - }; + match item { + InputItem::Message(input_msg) => { + let role = match input_msg.role { + MessageRole::User => Role::User, + MessageRole::Assistant => Role::Assistant, + MessageRole::System => Role::System, + MessageRole::Developer => Role::System, // Map developer to system + MessageRole::Tool => Role::Tool, + }; - // Convert content based on MessageContent type - let content = match &input_msg.content { - crate::apis::openai_responses::MessageContent::Text(text) => { - // Simple text content - MessageContent::Text(text.clone()) - } - crate::apis::openai_responses::MessageContent::Items(content_items) => { - // Check if it's a single text item (can use simple text format) - if content_items.len() == 1 { - if let InputContent::InputText { text } = &content_items[0] { - MessageContent::Text(text.clone()) + // Convert content based on MessageContent type + let content = match &input_msg.content { + crate::apis::openai_responses::MessageContent::Text(text) => { + // Simple text content + MessageContent::Text(text.clone()) + } + crate::apis::openai_responses::MessageContent::Items( + content_items, + ) => { + // Check if it's a single text item (can use simple text format) + if content_items.len() == 1 { + if let InputContent::InputText { text } = &content_items[0] + { + MessageContent::Text(text.clone()) + } else { + // Single non-text item - use parts format + MessageContent::Parts( + content_items + .iter() + .filter_map(|c| match c { + InputContent::InputText { text } => { + Some(crate::apis::openai::ContentPart::Text { + text: text.clone(), + }) + } + InputContent::InputImage { image_url, .. } => { + Some(crate::apis::openai::ContentPart::ImageUrl { + image_url: crate::apis::openai::ImageUrl { + url: image_url.clone(), + detail: None, + }, + }) + } + InputContent::InputFile { .. } => None, // Skip files for now + InputContent::InputAudio { .. } => None, // Skip audio for now + }) + .collect(), + ) + } } else { - // Single non-text item - use parts format + // Multiple content items - convert to parts MessageContent::Parts( - content_items.iter() + content_items + .iter() .filter_map(|c| match c { InputContent::InputText { text } => { - Some(crate::apis::openai::ContentPart::Text { text: text.clone() }) + Some(crate::apis::openai::ContentPart::Text { + text: text.clone(), + }) } InputContent::InputImage { image_url, .. } => { Some(crate::apis::openai::ContentPart::ImageUrl { image_url: crate::apis::openai::ImageUrl { url: image_url.clone(), detail: None, - } + }, }) } InputContent::InputFile { .. } => None, // Skip files for now InputContent::InputAudio { .. } => None, // Skip audio for now }) - .collect() + .collect(), ) } - } else { - // Multiple content items - convert to parts - MessageContent::Parts( - content_items - .iter() - .filter_map(|c| match c { - InputContent::InputText { text } => { - Some(crate::apis::openai::ContentPart::Text { - text: text.clone(), - }) - } - InputContent::InputImage { image_url, .. } => Some( - crate::apis::openai::ContentPart::ImageUrl { - image_url: crate::apis::openai::ImageUrl { - url: image_url.clone(), - detail: None, - }, - }, - ), - InputContent::InputFile { .. } => None, // Skip files for now - InputContent::InputAudio { .. } => None, // Skip audio for now - }) - .collect(), - ) + } + }; + + converted_messages.push(Message { + role, + content: Some(content), + name: None, + tool_call_id: None, + tool_calls: None, + }); + } + InputItem::FunctionCallOutput { + item_type: _, + call_id, + output, + } => { + // Preserve tool result so upstream models do not re-issue the same tool call. + let output_text = match output { + serde_json::Value::String(s) => s.clone(), + other => serde_json::to_string(&other).unwrap_or_default(), + }; + converted_messages.push(Message { + role: Role::Tool, + content: Some(MessageContent::Text(output_text)), + name: None, + tool_call_id: Some(call_id), + tool_calls: None, + }); + } + InputItem::FunctionCall { + item_type: _, + name, + arguments, + call_id, + } => { + let tool_call = OpenAIToolCall { + id: call_id, + call_type: "function".to_string(), + function: OpenAIFunctionCall { name, arguments }, + }; + + // Prefer attaching tool_calls to the preceding assistant message when present. + if let Some(last) = converted_messages.last_mut() { + if matches!(last.role, Role::Assistant) { + if let Some(existing) = &mut last.tool_calls { + existing.push(tool_call); + } else { + last.tool_calls = Some(vec![tool_call]); + } + continue; } } - }; - converted_messages.push(Message { - role, - content: Some(content), - name: None, - tool_call_id: None, - tool_calls: None, - }); + converted_messages.push(Message { + role: Role::Assistant, + content: None, + name: None, + tool_call_id: None, + tool_calls: Some(vec![tool_call]), + }); + } + InputItem::ItemReference { .. } => { + // Item references/unknown entries are metadata-like and can be skipped + // for chat-completions conversion. + } } } @@ -397,6 +469,170 @@ impl TryFrom for ChatCompletionsRequest { type Error = TransformError; fn try_from(req: ResponsesAPIRequest) -> Result { + fn normalize_function_parameters( + parameters: Option, + fallback_extra: Option, + ) -> serde_json::Value { + // ChatCompletions function tools require JSON Schema with top-level type=object. + let mut base = serde_json::json!({ + "type": "object", + "properties": {}, + }); + + if let Some(serde_json::Value::Object(mut obj)) = parameters { + // Enforce a valid object schema shape regardless of upstream tool format. + obj.insert( + "type".to_string(), + serde_json::Value::String("object".to_string()), + ); + if !obj.contains_key("properties") { + obj.insert( + "properties".to_string(), + serde_json::Value::Object(serde_json::Map::new()), + ); + } + base = serde_json::Value::Object(obj); + } + + if let Some(extra) = fallback_extra { + if let serde_json::Value::Object(ref mut map) = base { + map.insert("x-custom-format".to_string(), extra); + } + } + + base + } + + let mut converted_chat_tools: Vec = Vec::new(); + let mut web_search_options: Option = None; + + if let Some(tools) = req.tools.clone() { + for (idx, tool) in tools.into_iter().enumerate() { + match tool { + ResponsesTool::Function { + name, + description, + parameters, + strict, + } => converted_chat_tools.push(Tool { + tool_type: "function".to_string(), + function: crate::apis::openai::Function { + name, + description, + parameters: normalize_function_parameters(parameters, None), + strict, + }, + }), + ResponsesTool::WebSearchPreview { + search_context_size, + user_location, + .. + } => { + if web_search_options.is_none() { + let user_location_value = user_location.map(|loc| { + let mut approx = serde_json::Map::new(); + if let Some(city) = loc.city { + approx.insert( + "city".to_string(), + serde_json::Value::String(city), + ); + } + if let Some(country) = loc.country { + approx.insert( + "country".to_string(), + serde_json::Value::String(country), + ); + } + if let Some(region) = loc.region { + approx.insert( + "region".to_string(), + serde_json::Value::String(region), + ); + } + if let Some(timezone) = loc.timezone { + approx.insert( + "timezone".to_string(), + serde_json::Value::String(timezone), + ); + } + + serde_json::json!({ + "type": loc.location_type, + "approximate": serde_json::Value::Object(approx), + }) + }); + + let mut web_search = serde_json::Map::new(); + if let Some(size) = search_context_size { + web_search.insert( + "search_context_size".to_string(), + serde_json::Value::String(size), + ); + } + if let Some(location) = user_location_value { + web_search.insert("user_location".to_string(), location); + } + web_search_options = Some(serde_json::Value::Object(web_search)); + } + } + ResponsesTool::Custom { + name, + description, + format, + } => { + // Custom tools do not have a strict ChatCompletions equivalent for all + // providers. Map them to a permissive function tool for compatibility. + let tool_name = name.unwrap_or_else(|| format!("custom_tool_{}", idx + 1)); + let parameters = normalize_function_parameters( + Some(serde_json::json!({ + "type": "object", + "properties": { + "input": { "type": "string" } + }, + "required": ["input"], + "additionalProperties": true, + })), + format, + ); + + converted_chat_tools.push(Tool { + tool_type: "function".to_string(), + function: crate::apis::openai::Function { + name: tool_name, + description, + parameters, + strict: Some(false), + }, + }); + } + ResponsesTool::FileSearch { .. } => { + return Err(TransformError::UnsupportedConversion( + "FileSearch tool is not supported in ChatCompletions API. Only function/custom/web search tools are supported in this conversion." + .to_string(), + )); + } + ResponsesTool::CodeInterpreter => { + return Err(TransformError::UnsupportedConversion( + "CodeInterpreter tool is not supported in ChatCompletions API conversion." + .to_string(), + )); + } + ResponsesTool::Computer { .. } => { + return Err(TransformError::UnsupportedConversion( + "Computer tool is not supported in ChatCompletions API conversion." + .to_string(), + )); + } + } + } + } + + let tools = if converted_chat_tools.is_empty() { + None + } else { + Some(converted_chat_tools) + }; + // Convert input to messages using the shared converter let converter = ResponsesInputConverter { input: req.input, @@ -418,57 +654,24 @@ impl TryFrom for ChatCompletionsRequest { service_tier: req.service_tier, top_logprobs: req.top_logprobs.map(|t| t as u32), modalities: req.modalities.map(|mods| { - mods.into_iter().map(|m| { - match m { + mods.into_iter() + .map(|m| match m { Modality::Text => "text".to_string(), Modality::Audio => "audio".to_string(), - } - }).collect() + }) + .collect() }), - stream_options: req.stream_options.map(|opts| { - crate::apis::openai::StreamOptions { + stream_options: req + .stream_options + .map(|opts| crate::apis::openai::StreamOptions { include_usage: opts.include_usage, - } + }), + reasoning_effort: req.reasoning_effort.map(|effort| match effort { + ReasoningEffort::Low => "low".to_string(), + ReasoningEffort::Medium => "medium".to_string(), + ReasoningEffort::High => "high".to_string(), }), - reasoning_effort: req.reasoning_effort.map(|effort| { - match effort { - ReasoningEffort::Low => "low".to_string(), - ReasoningEffort::Medium => "medium".to_string(), - ReasoningEffort::High => "high".to_string(), - } - }), - tools: req.tools.map(|tools| { - tools.into_iter().map(|tool| { - - // Only convert Function tools - other types are not supported in ChatCompletions - match tool { - ResponsesTool::Function { name, description, parameters, strict } => Ok(Tool { - tool_type: "function".to_string(), - function: crate::apis::openai::Function { - name, - description, - parameters: parameters.unwrap_or_else(|| serde_json::json!({ - "type": "object", - "properties": {} - })), - strict, - } - }), - ResponsesTool::FileSearch { .. } => Err(TransformError::UnsupportedConversion( - "FileSearch tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::WebSearchPreview { .. } => Err(TransformError::UnsupportedConversion( - "WebSearchPreview tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::CodeInterpreter => Err(TransformError::UnsupportedConversion( - "CodeInterpreter tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::Computer { .. } => Err(TransformError::UnsupportedConversion( - "Computer tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - } - }).collect::, _>>() - }).transpose()?, + tools, tool_choice: req.tool_choice.map(|choice| { match choice { ResponsesToolChoice::String(s) => { @@ -481,11 +684,14 @@ impl TryFrom for ChatCompletionsRequest { } ResponsesToolChoice::Named { function, .. } => ToolChoice::Function { choice_type: "function".to_string(), - function: crate::apis::openai::FunctionChoice { name: function.name } - } + function: crate::apis::openai::FunctionChoice { + name: function.name, + }, + }, } }), parallel_tool_calls: req.parallel_tool_calls, + web_search_options, ..Default::default() }) } @@ -1027,4 +1233,235 @@ mod tests { panic!("Expected text content block"); } } + + #[test] + fn test_responses_custom_tool_maps_to_function_tool_for_chat_completions() { + use crate::apis::openai_responses::{ + InputParam, ResponsesAPIRequest, Tool as ResponsesTool, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Text("use custom tool".to_string()), + tools: Some(vec![ResponsesTool::Custom { + name: Some("run_patch".to_string()), + description: Some("Apply structured patch".to_string()), + format: Some(serde_json::json!({ + "kind": "patch", + "version": "v1" + })), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + let tools = converted.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + assert_eq!(tools[0].tool_type, "function"); + assert_eq!(tools[0].function.name, "run_patch"); + assert_eq!( + tools[0].function.description.as_deref(), + Some("Apply structured patch") + ); + } + + #[test] + fn test_responses_web_search_maps_to_chat_web_search_options() { + use crate::apis::openai_responses::{ + InputParam, ResponsesAPIRequest, Tool as ResponsesTool, UserLocation, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Text("find project docs".to_string()), + tools: Some(vec![ResponsesTool::WebSearchPreview { + domains: Some(vec!["docs.planoai.dev".to_string()]), + search_context_size: Some("medium".to_string()), + user_location: Some(UserLocation { + location_type: "approximate".to_string(), + city: Some("San Francisco".to_string()), + country: Some("US".to_string()), + region: Some("CA".to_string()), + timezone: Some("America/Los_Angeles".to_string()), + }), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert!(converted.web_search_options.is_some()); + } + + #[test] + fn test_responses_function_call_output_maps_to_tool_message() { + use crate::apis::openai_responses::{ + InputItem, InputParam, ResponsesAPIRequest, Tool as ResponsesTool, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Items(vec![InputItem::FunctionCallOutput { + item_type: "function_call_output".to_string(), + call_id: "call_123".to_string(), + output: serde_json::json!({"status":"ok","stdout":"hello"}), + }]), + tools: Some(vec![ResponsesTool::Function { + name: "exec_command".to_string(), + description: Some("Execute a shell command".to_string()), + parameters: Some(serde_json::json!({ + "type": "object", + "properties": { + "cmd": { "type": "string" } + }, + "required": ["cmd"] + })), + strict: Some(false), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert_eq!(converted.messages.len(), 1); + assert!(matches!(converted.messages[0].role, Role::Tool)); + assert_eq!( + converted.messages[0].tool_call_id.as_deref(), + Some("call_123") + ); + } + + #[test] + fn test_responses_function_call_and_output_preserve_call_id_link() { + use crate::apis::openai_responses::{ + InputItem, InputMessage, MessageContent as ResponsesMessageContent, MessageRole, + ResponsesAPIRequest, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Items(vec![ + InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: ResponsesMessageContent::Items(vec![]), + }), + InputItem::FunctionCall { + item_type: "function_call".to_string(), + name: "exec_command".to_string(), + arguments: "{\"cmd\":\"pwd\"}".to_string(), + call_id: "toolu_abc123".to_string(), + }, + InputItem::FunctionCallOutput { + item_type: "function_call_output".to_string(), + call_id: "toolu_abc123".to_string(), + output: serde_json::Value::String("ok".to_string()), + }, + ]), + tools: None, + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert_eq!(converted.messages.len(), 2); + + assert!(matches!(converted.messages[0].role, Role::Assistant)); + let tool_calls = converted.messages[0] + .tool_calls + .as_ref() + .expect("assistant tool_calls should be present"); + assert_eq!(tool_calls.len(), 1); + assert_eq!(tool_calls[0].id, "toolu_abc123"); + + assert!(matches!(converted.messages[1].role, Role::Tool)); + assert_eq!( + converted.messages[1].tool_call_id.as_deref(), + Some("toolu_abc123") + ); + } } diff --git a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs index 328317bc..4aa719af 100644 --- a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs +++ b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs @@ -512,19 +512,12 @@ impl TryFrom for ResponsesAPIStreamEvent { } } - // Handle finish_reason - this is a completion signal - // Return an empty delta that the buffer can use to detect completion + // Handle finish_reason - this is a completion signal. + // Emit an explicit Done marker so the buffering layer can finalize + // even if an upstream [DONE] marker is missing/delayed. if choice.finish_reason.is_some() { - // Return a minimal text delta to signal completion - // The buffer will handle the finish_reason and generate response.completed - return Ok(ResponsesAPIStreamEvent::ResponseOutputTextDelta { - item_id: "".to_string(), // Buffer will fill this - output_index: choice.index as i32, - content_index: 0, - delta: "".to_string(), // Empty delta signals completion - logprobs: vec![], - obfuscation: None, - sequence_number: 0, // Buffer will fill this + return Ok(ResponsesAPIStreamEvent::Done { + sequence_number: 0, // Buffer will assign final sequence }); } diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 547ba166..7a353bcb 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -1046,7 +1046,8 @@ impl HttpContext for StreamContext { ); match ProviderRequestType::try_from((deserialized_client_request, upstream)) { - Ok(request) => { + Ok(mut request) => { + request.normalize_for_upstream(self.get_provider_id(), upstream); debug!( "request_id={}: upstream request payload: {}", self.request_identifier(), diff --git a/demos/README.md b/demos/README.md index a2613454..6e467a33 100644 --- a/demos/README.md +++ b/demos/README.md @@ -16,6 +16,7 @@ This directory contains demos showcasing Plano's capabilities as an AI-native pr | [Preference-Based Routing](llm_routing/preference_based_routing/) | Routes prompts to LLMs based on user-defined preferences and task type (e.g. code generation vs. understanding) | | [Model Alias Routing](llm_routing/model_alias_routing/) | Maps semantic aliases (`arch.summarize.v1`) to provider-specific models for centralized governance | | [Claude Code Router](llm_routing/claude_code_router/) | Extends Claude Code with multi-provider access and preference-aligned routing for coding tasks | +| [Codex Router](llm_routing/codex_router/) | Extends Codex CLI with multi-provider access and preference-aligned routing for coding tasks | ## Agent Orchestration diff --git a/demos/llm_routing/codex_router/README.md b/demos/llm_routing/codex_router/README.md new file mode 100644 index 00000000..d3662581 --- /dev/null +++ b/demos/llm_routing/codex_router/README.md @@ -0,0 +1,92 @@ +# Codex Router - Multi-Model Access with Intelligent Routing + +Plano extends Codex CLI to access multiple LLM providers through a single interface. This gives you: + +1. **Access to Models**: Connect to OpenAI, Anthropic, xAI, Gemini, and local models via Ollama +2. **Intelligent Routing via Preferences for Coding Tasks**: Configure which models handle specific development tasks: + - Code generation and implementation + - Code understanding and analysis + - Debugging and optimization + - Architecture and system design + +Uses a [1.5B preference-aligned router LLM](https://arxiv.org/abs/2506.16655) to automatically select the best model based on your request type. + +## Benefits + +- **Single Interface**: Access multiple LLM providers through the same Codex CLI +- **Task-Aware Routing**: Requests are analyzed and routed to models based on task type (code generation vs code understanding) +- **Provider Flexibility**: Add or remove providers without changing your workflow +- **Routing Transparency**: See which model handles each request and why + +## Quick Start + +### Prerequisites + +```bash +# Install Codex CLI +npm install -g @openai/codex + +# Install Plano CLI +pip install planoai +``` + +### Step 1: Open the Demo + +```bash +git clone https://github.com/katanemo/arch.git +cd arch/demos/llm_routing/codex_router +``` + +### Step 2: Set API Keys + +```bash +export OPENAI_API_KEY="your-openai-key-here" +export ANTHROPIC_API_KEY="your-anthropic-key-here" +export XAI_API_KEY="your-xai-key-here" +export GEMINI_API_KEY="your-gemini-key-here" +``` + +### Step 3: Start Plano + +```bash +planoai up +# or: uvx planoai up +``` + +### Step 4: Launch Codex Through Plano + +```bash +planoai cli-agent codex +# or: uvx planoai cli-agent codex +``` + +By default, `planoai cli-agent codex` starts Codex with `gpt-5.3-codex`. With this demo config: + +- `code understanding` prompts are routed to `gpt-5-2025-08-07` +- `code generation` prompts are routed to `gpt-5.3-codex` + +## Monitor Routing Decisions + +In a second terminal: + +```bash +sh pretty_model_resolution.sh +``` + +This shows each request model and the final model selected by Plano's router. + +## Configuration Highlights + +`config.yaml` demonstrates: + +- OpenAI default model for Codex sessions (`gpt-5.3-codex`) +- Routing preference override for code understanding (`gpt-5-2025-08-07`) +- Additional providers (Anthropic, xAI, Gemini, Ollama local) to show cross-provider routing support + +## Optional Overrides + +Set a different Codex session model: + +```bash +planoai cli-agent codex --settings='{"CODEX_MODEL":"gpt-5-2025-08-07"}' +``` diff --git a/demos/llm_routing/codex_router/config.yaml b/demos/llm_routing/codex_router/config.yaml new file mode 100644 index 00000000..7cafe641 --- /dev/null +++ b/demos/llm_routing/codex_router/config.yaml @@ -0,0 +1,38 @@ +version: v0.3.0 + +listeners: + - type: model + name: model_listener + port: 12000 + +model_providers: + # OpenAI models used by Codex defaults and preference routing + - model: openai/gpt-5.3-codex + default: true + access_key: $OPENAI_API_KEY + routing_preferences: + - name: code generation + description: generating new code snippets, functions, or boilerplate based on user prompts or requirements + + - model: xai/grok-4-1-fast-non-reasoning + access_key: $GROK_API_KEY + routing_preferences: + - name: project understanding + description: understand repository structure, codebase, and code files, readmes, and other documentation + + # Additional providers (optional): Codex can route to any configured model + # - model: anthropic/claude-sonnet-4-5 + # access_key: $ANTHROPIC_API_KEY + + # - model: xai/grok-4-1-fast-non-reasoning + # access_key: $GROK_API_KEY + + - model: ollama/llama3.1 + base_url: http://localhost:11434 + +model_aliases: + arch.codex.default: + target: gpt-5.3-codex + +tracing: + random_sampling: 100 diff --git a/demos/llm_routing/codex_router/pretty_model_resolution.sh b/demos/llm_routing/codex_router/pretty_model_resolution.sh new file mode 100644 index 00000000..b6187e65 --- /dev/null +++ b/demos/llm_routing/codex_router/pretty_model_resolution.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Pretty-print Plano MODEL_RESOLUTION lines from docker logs +# - hides Arch-Router +# - prints timestamp +# - colors MODEL_RESOLUTION red +# - colors req_model cyan +# - colors resolved_model magenta +# - removes provider and streaming + +docker logs -f plano 2>&1 \ +| awk ' +/MODEL_RESOLUTION:/ && $0 !~ /Arch-Router/ { + # extract timestamp between first [ and ] + ts="" + if (match($0, /\[[0-9-]+ [0-9:.]+\]/)) { + ts=substr($0, RSTART+1, RLENGTH-2) + } + + # split out after MODEL_RESOLUTION: + n = split($0, parts, /MODEL_RESOLUTION: */) + line = parts[2] + + # remove provider and streaming fields + sub(/ *provider='\''[^'\'']+'\''/, "", line) + sub(/ *streaming=(true|false)/, "", line) + + # highlight fields + gsub(/req_model='\''[^'\'']+'\''/, "\033[36m&\033[0m", line) + gsub(/resolved_model='\''[^'\'']+'\''/, "\033[35m&\033[0m", line) + + # print timestamp + MODEL_RESOLUTION + printf "\033[90m[%s]\033[0m \033[31mMODEL_RESOLUTION\033[0m: %s\n", ts, line +}'