diff --git a/cli/planoai/core.py b/cli/planoai/core.py index e9ddc7bd..174f37c0 100644 --- a/cli/planoai/core.py +++ b/cli/planoai/core.py @@ -10,7 +10,6 @@ from planoai.consts import ( PLANO_DOCKER_IMAGE, PLANO_DOCKER_NAME, ) -import subprocess from planoai.docker_cli import ( docker_container_status, docker_remove_container, @@ -147,26 +146,48 @@ def stop_docker_container(service=PLANO_DOCKER_NAME): log.info(f"Failed to shut down services: {str(e)}") -def start_cli_agent(plano_config_file=None, settings_json="{}"): - """Start a CLI client connected to Plano.""" - - with open(plano_config_file, "r") as file: - plano_config = file.read() - plano_config_yaml = yaml.safe_load(plano_config) - - # Get egress listener configuration - egress_config = plano_config_yaml.get("listeners", {}).get("egress_traffic", {}) - host = egress_config.get("host", "127.0.0.1") - port = egress_config.get("port", 12000) - - # Parse additional settings from command line +def _parse_cli_agent_settings(settings_json: str) -> dict: try: - additional_settings = json.loads(settings_json) if settings_json else {} + return json.loads(settings_json) if settings_json else {} except json.JSONDecodeError: log.error("Settings must be valid JSON") sys.exit(1) - # Set up environment variables + +def _resolve_cli_agent_endpoint(plano_config_yaml: dict) -> tuple[str, int]: + listeners = plano_config_yaml.get("listeners") + + if isinstance(listeners, dict): + egress_config = listeners.get("egress_traffic", {}) + host = egress_config.get("host") or egress_config.get("address") or "0.0.0.0" + port = egress_config.get("port", 12000) + return host, port + + if isinstance(listeners, list): + for listener in listeners: + if listener.get("type") in ["model", "model_listener"]: + host = listener.get("host") or listener.get("address") or "0.0.0.0" + port = listener.get("port", 12000) + return host, port + + return "0.0.0.0", 12000 + + +def _apply_non_interactive_env(env: dict, additional_settings: dict) -> None: + if additional_settings.get("NON_INTERACTIVE_MODE", False): + env.update( + { + "CI": "true", + "FORCE_COLOR": "0", + "NODE_NO_READLINE": "1", + "TERM": "dumb", + } + ) + + +def _start_claude_cli_agent( + host: str, port: int, plano_config_yaml: dict, additional_settings: dict +) -> None: env = os.environ.copy() env.update( { @@ -186,7 +207,6 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): "ANTHROPIC_SMALL_FAST_MODEL" ] else: - # Check if arch.claude.code.small.fast alias exists in model_aliases model_aliases = plano_config_yaml.get("model_aliases", {}) if "arch.claude.code.small.fast" in model_aliases: env["ANTHROPIC_SMALL_FAST_MODEL"] = "arch.claude.code.small.fast" @@ -196,23 +216,10 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): ) log.info("Or provide ANTHROPIC_SMALL_FAST_MODEL in --settings JSON") - # Non-interactive mode configuration from additional_settings only - if additional_settings.get("NON_INTERACTIVE_MODE", False): - env.update( - { - "CI": "true", - "FORCE_COLOR": "0", - "NODE_NO_READLINE": "1", - "TERM": "dumb", - } - ) + _apply_non_interactive_env(env, additional_settings) - # Build claude command arguments claude_args = [] - - # Add settings if provided, excluding those already handled as environment variables if additional_settings: - # Filter out settings that are already processed as environment variables claude_settings = { k: v for k, v in additional_settings.items() @@ -221,10 +228,8 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): if claude_settings: claude_args.append(f"--settings={json.dumps(claude_settings)}") - # Use claude from PATH claude_path = "claude" log.info(f"Connecting Claude Code Agent to Plano at {host}:{port}") - try: subprocess.run([claude_path] + claude_args, env=env, check=True) except subprocess.CalledProcessError as e: @@ -235,3 +240,61 @@ def start_cli_agent(plano_config_file=None, settings_json="{}"): f"{claude_path} not found. Make sure Claude Code is installed: npm install -g @anthropic-ai/claude-code" ) sys.exit(1) + + +def _start_codex_cli_agent(host: str, port: int, additional_settings: dict) -> None: + env = os.environ.copy() + env.update( + { + "OPENAI_API_KEY": "test", # Use test token for plano + "OPENAI_BASE_URL": f"http://{host}:{port}/v1", + "NO_PROXY": host, + "DISABLE_TELEMETRY": "true", + } + ) + _apply_non_interactive_env(env, additional_settings) + + codex_model = additional_settings.get("CODEX_MODEL", "gpt-5.3-codex") + codex_path = "codex" + codex_args = ["--model", codex_model] + + log.info( + f"Connecting Codex CLI Agent to Plano at {host}:{port} (default model: {codex_model})" + ) + try: + subprocess.run([codex_path] + codex_args, env=env, check=True) + except subprocess.CalledProcessError as e: + log.error(f"Error starting codex: {e}") + sys.exit(1) + except FileNotFoundError: + log.error( + f"{codex_path} not found. Make sure Codex CLI is installed: npm install -g @openai/codex" + ) + sys.exit(1) + + +def start_cli_agent( + plano_config_file=None, cli_agent_type="claude", settings_json="{}" +): + """Start a CLI client connected to Plano.""" + + with open(plano_config_file, "r") as file: + plano_config = file.read() + plano_config_yaml = yaml.safe_load(plano_config) + + host, port = _resolve_cli_agent_endpoint(plano_config_yaml) + + additional_settings = _parse_cli_agent_settings(settings_json) + + if cli_agent_type == "claude": + _start_claude_cli_agent(host, port, plano_config_yaml, additional_settings) + return + + if cli_agent_type == "codex": + _start_codex_cli_agent(host, port, additional_settings) + return + + log.error( + f"Unsupported cli agent type '{cli_agent_type}'. Supported values: claude, codex" + ) + sys.exit(1) diff --git a/cli/planoai/main.py b/cli/planoai/main.py index a63f294e..e9cdc0a0 100644 --- a/cli/planoai/main.py +++ b/cli/planoai/main.py @@ -1,3 +1,4 @@ +import json import os import multiprocessing import subprocess @@ -31,6 +32,7 @@ from planoai.trace_cmd import trace as trace_cmd, start_trace_listener_backgroun from planoai.consts import ( DEFAULT_OTEL_TRACING_GRPC_ENDPOINT, DEFAULT_NATIVE_OTEL_TRACING_GRPC_ENDPOINT, + NATIVE_PID_FILE, PLANO_DOCKER_IMAGE, PLANO_DOCKER_NAME, ) @@ -40,6 +42,30 @@ from planoai.versioning import check_version_status, get_latest_version, get_ver log = getLogger(__name__) +def _is_native_plano_running() -> bool: + if not os.path.exists(NATIVE_PID_FILE): + return False + try: + with open(NATIVE_PID_FILE, "r") as f: + pids = json.load(f) + except (OSError, json.JSONDecodeError): + return False + + envoy_pid = pids.get("envoy_pid") + brightstaff_pid = pids.get("brightstaff_pid") + if not isinstance(envoy_pid, int) or not isinstance(brightstaff_pid, int): + return False + + for pid in (envoy_pid, brightstaff_pid): + try: + os.kill(pid, 0) + except ProcessLookupError: + return False + except PermissionError: + continue + return True + + def _is_port_in_use(port: int) -> bool: """Check if a TCP port is already bound on localhost.""" import socket @@ -523,7 +549,7 @@ def logs(debug, follow, docker): @click.command() -@click.argument("type", type=click.Choice(["claude"]), required=True) +@click.argument("type", type=click.Choice(["claude", "codex"]), required=True) @click.argument("file", required=False) # Optional file argument @click.option( "--path", default=".", help="Path to the directory containing plano_config.yaml" @@ -536,14 +562,19 @@ def logs(debug, follow, docker): def cli_agent(type, file, path, settings): """Start a CLI agent connected to Plano. - CLI_AGENT: The type of CLI agent to start (currently only 'claude' is supported) + CLI_AGENT: The type of CLI agent to start ('claude' or 'codex') """ - # Check if plano docker container is running - plano_status = docker_container_status(PLANO_DOCKER_NAME) - if plano_status != "running": - log.error(f"plano docker container is not running (status: {plano_status})") - log.error("Please start plano using the 'planoai up' command.") + native_running = _is_native_plano_running() + docker_running = False + if not native_running: + docker_running = docker_container_status(PLANO_DOCKER_NAME) == "running" + + if not (native_running or docker_running): + log.error("Plano is not running.") + log.error( + "Start Plano first using 'planoai up ' (native or --docker mode)." + ) sys.exit(1) # Determine plano_config.yaml path @@ -553,7 +584,7 @@ def cli_agent(type, file, path, settings): sys.exit(1) try: - start_cli_agent(plano_config_file, settings) + start_cli_agent(plano_config_file, type, settings) except SystemExit: # Re-raise SystemExit to preserve exit codes raise diff --git a/cli/uv.lock b/cli/uv.lock index 45ccf82e..9d85bf85 100644 --- a/cli/uv.lock +++ b/cli/uv.lock @@ -337,7 +337,7 @@ wheels = [ [[package]] name = "planoai" -version = "0.4.7" +version = "0.4.9" source = { editable = "." } dependencies = [ { name = "click" }, diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 64e780a4..b7b85e58 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -18,7 +18,7 @@ use std::sync::Arc; use tokio::sync::RwLock; use tracing::{debug, info, info_span, warn, Instrument}; -mod router; +pub(crate) mod router; use crate::app_state::AppState; use crate::handlers::request::extract_request_id; @@ -120,6 +120,7 @@ async fn llm_chat_inner( temperature, tool_names, user_message_preview, + inline_routing_policy, } = parsed; // Record LLM-specific span attributes @@ -186,6 +187,7 @@ async fn llm_chat_inner( &traceparent, &request_path, &request_id, + inline_routing_policy, ) .await } @@ -245,6 +247,7 @@ struct PreparedRequest { temperature: Option, tool_names: Option>, user_message_preview: Option, + inline_routing_policy: Option>, } /// Parse the body, resolve the model alias, and validate the model exists. @@ -256,7 +259,7 @@ async fn parse_and_validate_request( model_aliases: &Arc>>, llm_providers: &Arc>, ) -> Result>> { - let chat_request_bytes = request + let raw_bytes = request .collect() .await .map_err(|_| { @@ -267,10 +270,21 @@ async fn parse_and_validate_request( .to_bytes(); debug!( - body = %String::from_utf8_lossy(&chat_request_bytes), + body = %String::from_utf8_lossy(&raw_bytes), "request body received" ); + // Extract routing_policy from request body if present + let (chat_request_bytes, inline_routing_policy) = + crate::handlers::routing_service::extract_routing_policy(&raw_bytes, false).map_err( + |err| { + warn!(error = %err, "failed to parse request JSON"); + let mut r = Response::new(full(format!("Failed to parse request: {}", err))); + *r.status_mut() = StatusCode::BAD_REQUEST; + r + }, + )?; + let api_type = SupportedAPIsFromClient::from_endpoint(request_path).ok_or_else(|| { warn!(path = %request_path, "unsupported endpoint"); let mut r = Response::new(full(format!("Unsupported endpoint: {}", request_path))); @@ -296,6 +310,7 @@ async fn parse_and_validate_request( let temperature = client_request.get_temperature(); let is_streaming_request = client_request.is_streaming(); let alias_resolved_model = resolve_model_alias(&model_from_request, model_aliases); + let (provider_id, _) = get_provider_info(llm_providers, &alias_resolved_model).await; // Validate model exists in configuration if llm_providers @@ -332,6 +347,14 @@ async fn parse_and_validate_request( if client_request.remove_metadata_key("archgw_preference_config") { debug!("removed archgw_preference_config from metadata"); } + if client_request.remove_metadata_key("plano_preference_config") { + debug!("removed plano_preference_config from metadata"); + } + if let Some(ref client_api_kind) = client_api { + let upstream_api = + provider_id.compatible_api_for_client(client_api_kind, is_streaming_request); + client_request.normalize_for_upstream(provider_id, &upstream_api); + } Ok(PreparedRequest { client_request, @@ -344,6 +367,7 @@ async fn parse_and_validate_request( temperature, tool_names, user_message_preview, + inline_routing_policy, }) } diff --git a/crates/brightstaff/src/handlers/llm/router.rs b/crates/brightstaff/src/handlers/llm/router.rs index fbcc44a4..a53837f7 100644 --- a/crates/brightstaff/src/handlers/llm/router.rs +++ b/crates/brightstaff/src/handlers/llm/router.rs @@ -10,6 +10,7 @@ use crate::tracing::routing; pub struct RoutingResult { pub model_name: String, + pub route_name: Option, } pub struct RoutingError { @@ -37,6 +38,7 @@ pub async fn router_chat_get_upstream_model( traceparent: &str, request_path: &str, request_id: &str, + inline_usage_preferences: Option>, ) -> Result { // Clone metadata for routing before converting (which consumes client_request) let routing_metadata = client_request.metadata().clone(); @@ -75,16 +77,21 @@ pub async fn router_chat_get_upstream_model( "router request" ); - // Extract usage preferences from metadata - let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { - metadata - .get("plano_preference_config") - .map(|value| value.to_string()) - }); - - let usage_preferences: Option> = usage_preferences_str - .as_ref() - .and_then(|s| serde_yaml::from_str(s).ok()); + // Use inline preferences if provided, otherwise fall back to metadata extraction + let usage_preferences: Option> = if inline_usage_preferences.is_some() + { + inline_usage_preferences + } else { + let usage_preferences_str: Option = + routing_metadata.as_ref().and_then(|metadata| { + metadata + .get("plano_preference_config") + .map(|value| value.to_string()) + }); + usage_preferences_str + .as_ref() + .and_then(|s| serde_yaml::from_str(s).ok()) + }; // Prepare log message with latest message from chat request let latest_message_for_log = chat_request @@ -133,9 +140,12 @@ pub async fn router_chat_get_upstream_model( match routing_result { Ok(route) => match route { - Some((_, model_name)) => { + Some((route_name, model_name)) => { current_span.record("route.selected_model", model_name.as_str()); - Ok(RoutingResult { model_name }) + Ok(RoutingResult { + model_name, + route_name: Some(route_name), + }) } None => { // No route determined, return sentinel value "none" @@ -145,6 +155,7 @@ pub async fn router_chat_get_upstream_model( Ok(RoutingResult { model_name: "none".to_string(), + route_name: None, }) } }, diff --git a/crates/brightstaff/src/handlers/mod.rs b/crates/brightstaff/src/handlers/mod.rs index da8fa442..23a85cd4 100644 --- a/crates/brightstaff/src/handlers/mod.rs +++ b/crates/brightstaff/src/handlers/mod.rs @@ -5,6 +5,7 @@ pub mod llm; pub mod models; pub mod request; pub mod response; +pub mod routing_service; pub mod utils; #[cfg(test)] diff --git a/crates/brightstaff/src/handlers/routing_service.rs b/crates/brightstaff/src/handlers/routing_service.rs new file mode 100644 index 00000000..4b4a2b6f --- /dev/null +++ b/crates/brightstaff/src/handlers/routing_service.rs @@ -0,0 +1,357 @@ +use bytes::Bytes; +use common::configuration::{ModelUsagePreference, SpanAttributes}; +use common::consts::{REQUEST_ID_HEADER, TRACE_PARENT_HEADER}; +use common::errors::BrightStaffError; +use hermesllm::clients::SupportedAPIsFromClient; +use hermesllm::ProviderRequestType; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::{Request, Response, StatusCode}; +use std::sync::Arc; +use tracing::{debug, info, info_span, warn, Instrument}; + +use crate::handlers::llm::router::router_chat_get_upstream_model; +use crate::router::llm::RouterService; +use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name}; + +const ROUTING_POLICY_SIZE_WARNING_BYTES: usize = 5120; + +/// Extracts `routing_policy` from a JSON body, returning the cleaned body bytes +/// and parsed preferences. The `routing_policy` field is removed from the JSON +/// before re-serializing so downstream parsers don't see the non-standard field. +/// +/// If `warn_on_size` is true, logs a warning when the serialized policy exceeds 5KB. +pub fn extract_routing_policy( + raw_bytes: &[u8], + warn_on_size: bool, +) -> Result<(Bytes, Option>), String> { + let mut json_body: serde_json::Value = serde_json::from_slice(raw_bytes) + .map_err(|err| format!("Failed to parse JSON: {}", err))?; + + let preferences = json_body + .as_object_mut() + .and_then(|obj| obj.remove("routing_policy")) + .and_then(|policy_value| { + if warn_on_size { + let policy_str = serde_json::to_string(&policy_value).unwrap_or_default(); + if policy_str.len() > ROUTING_POLICY_SIZE_WARNING_BYTES { + warn!( + size_bytes = policy_str.len(), + limit_bytes = ROUTING_POLICY_SIZE_WARNING_BYTES, + "routing_policy exceeds recommended size limit" + ); + } + } + match serde_json::from_value::>(policy_value) { + Ok(prefs) => { + info!( + num_models = prefs.len(), + "using inline routing_policy from request body" + ); + Some(prefs) + } + Err(err) => { + warn!(error = %err, "failed to parse routing_policy"); + None + } + } + }); + + let bytes = Bytes::from(serde_json::to_vec(&json_body).unwrap()); + Ok((bytes, preferences)) +} + +#[derive(serde::Serialize)] +struct RoutingDecisionResponse { + model: String, + route: Option, + trace_id: String, +} + +pub async fn routing_decision( + request: Request, + router_service: Arc, + request_path: String, + span_attributes: Arc>, +) -> Result>, hyper::Error> { + let request_headers = request.headers().clone(); + let request_id: String = request_headers + .get(REQUEST_ID_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + + let custom_attrs = + collect_custom_trace_attributes(&request_headers, span_attributes.as_ref().as_ref()); + + let request_span = info_span!( + "routing_decision", + component = "routing", + request_id = %request_id, + http.method = %request.method(), + http.path = %request_path, + ); + + routing_decision_inner( + request, + router_service, + request_id, + request_path, + request_headers, + custom_attrs, + ) + .instrument(request_span) + .await +} + +async fn routing_decision_inner( + request: Request, + router_service: Arc, + request_id: String, + request_path: String, + request_headers: hyper::HeaderMap, + custom_attrs: std::collections::HashMap, +) -> Result>, hyper::Error> { + set_service_name(operation_component::ROUTING); + opentelemetry::trace::get_active_span(|span| { + for (key, value) in &custom_attrs { + span.set_attribute(opentelemetry::KeyValue::new(key.clone(), value.clone())); + } + }); + + // Extract or generate traceparent + let traceparent: String = match request_headers + .get(TRACE_PARENT_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + { + Some(tp) => tp, + None => { + let trace_id = uuid::Uuid::new_v4().to_string().replace("-", ""); + let generated_tp = format!("00-{}-0000000000000000-01", trace_id); + warn!( + generated_traceparent = %generated_tp, + "TRACE_PARENT header missing, generated new traceparent" + ); + generated_tp + } + }; + + // Extract trace_id from traceparent (format: 00-{trace_id}-{span_id}-{flags}) + let trace_id = traceparent + .split('-') + .nth(1) + .unwrap_or("unknown") + .to_string(); + + // Parse request body + let raw_bytes = request.collect().await?.to_bytes(); + + debug!( + body = %String::from_utf8_lossy(&raw_bytes), + "routing decision request body received" + ); + + // Extract routing_policy from request body before parsing as ProviderRequestType + let (chat_request_bytes, inline_preferences) = match extract_routing_policy(&raw_bytes, true) { + Ok(result) => result, + Err(err) => { + warn!(error = %err, "failed to parse request JSON"); + return Ok(BrightStaffError::InvalidRequest(format!( + "Failed to parse request JSON: {}", + err + )) + .into_response()); + } + }; + + let client_request = match ProviderRequestType::try_from(( + &chat_request_bytes[..], + &SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(), + )) { + Ok(request) => request, + Err(err) => { + warn!(error = %err, "failed to parse request for routing decision"); + return Ok(BrightStaffError::InvalidRequest(format!( + "Failed to parse request: {}", + err + )) + .into_response()); + } + }; + + // Call the existing routing logic with inline preferences + let routing_result = router_chat_get_upstream_model( + router_service, + client_request, + &traceparent, + &request_path, + &request_id, + inline_preferences, + ) + .await; + + match routing_result { + Ok(result) => { + let response = RoutingDecisionResponse { + model: result.model_name, + route: result.route_name, + trace_id, + }; + + info!( + model = %response.model, + route = ?response.route, + "routing decision completed" + ); + + let json = serde_json::to_string(&response).unwrap(); + let body = Full::new(Bytes::from(json)) + .map_err(|never| match never {}) + .boxed(); + + Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(body) + .unwrap()) + } + Err(err) => { + warn!(error = %err.message, "routing decision failed"); + Ok(BrightStaffError::InternalServerError(err.message).into_response()) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn make_chat_body(extra_fields: &str) -> Vec { + let extra = if extra_fields.is_empty() { + String::new() + } else { + format!(", {}", extra_fields) + }; + format!( + r#"{{"model": "gpt-4o-mini", "messages": [{{"role": "user", "content": "hello"}}]{}}}"#, + extra + ) + .into_bytes() + } + + #[test] + fn extract_routing_policy_no_policy() { + let body = make_chat_body(""); + let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + + assert!(prefs.is_none()); + let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); + assert_eq!(cleaned_json["model"], "gpt-4o-mini"); + assert!(cleaned_json.get("routing_policy").is_none()); + } + + #[test] + fn extract_routing_policy_valid_policy() { + let policy = r#""routing_policy": [ + { + "model": "openai/gpt-4o", + "routing_preferences": [ + {"name": "coding", "description": "code generation tasks"} + ] + }, + { + "model": "openai/gpt-4o-mini", + "routing_preferences": [ + {"name": "general", "description": "general questions"} + ] + } + ]"#; + let body = make_chat_body(policy); + let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + + let prefs = prefs.expect("should have parsed preferences"); + assert_eq!(prefs.len(), 2); + assert_eq!(prefs[0].model, "openai/gpt-4o"); + assert_eq!(prefs[0].routing_preferences[0].name, "coding"); + assert_eq!(prefs[1].model, "openai/gpt-4o-mini"); + assert_eq!(prefs[1].routing_preferences[0].name, "general"); + + // routing_policy should be stripped from cleaned body + let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); + assert!(cleaned_json.get("routing_policy").is_none()); + assert_eq!(cleaned_json["model"], "gpt-4o-mini"); + } + + #[test] + fn extract_routing_policy_invalid_policy_returns_none() { + // routing_policy is present but has wrong shape + let policy = r#""routing_policy": "not-an-array""#; + let body = make_chat_body(policy); + let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + + // Invalid policy should be ignored (returns None), not error + assert!(prefs.is_none()); + // routing_policy should still be stripped from cleaned body + let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); + assert!(cleaned_json.get("routing_policy").is_none()); + } + + #[test] + fn extract_routing_policy_invalid_json_returns_error() { + let body = b"not valid json"; + let result = extract_routing_policy(body, false); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("Failed to parse JSON")); + } + + #[test] + fn extract_routing_policy_empty_array() { + let policy = r#""routing_policy": []"#; + let body = make_chat_body(policy); + let (_, prefs) = extract_routing_policy(&body, false).unwrap(); + + let prefs = prefs.expect("empty array is valid"); + assert_eq!(prefs.len(), 0); + } + + #[test] + fn extract_routing_policy_preserves_other_fields() { + let policy = r#""routing_policy": [{"model": "gpt-4o", "routing_preferences": [{"name": "test", "description": "test"}]}], "temperature": 0.5, "max_tokens": 100"#; + let body = make_chat_body(policy); + let (cleaned, prefs) = extract_routing_policy(&body, false).unwrap(); + + assert!(prefs.is_some()); + let cleaned_json: serde_json::Value = serde_json::from_slice(&cleaned).unwrap(); + assert_eq!(cleaned_json["temperature"], 0.5); + assert_eq!(cleaned_json["max_tokens"], 100); + assert!(cleaned_json.get("routing_policy").is_none()); + } + + #[test] + fn routing_decision_response_serialization() { + let response = RoutingDecisionResponse { + model: "openai/gpt-4o".to_string(), + route: Some("code_generation".to_string()), + trace_id: "abc123".to_string(), + }; + let json = serde_json::to_string(&response).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["model"], "openai/gpt-4o"); + assert_eq!(parsed["route"], "code_generation"); + assert_eq!(parsed["trace_id"], "abc123"); + } + + #[test] + fn routing_decision_response_serialization_no_route() { + let response = RoutingDecisionResponse { + model: "none".to_string(), + route: None, + trace_id: "abc123".to_string(), + }; + let json = serde_json::to_string(&response).unwrap(); + let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); + assert_eq!(parsed["model"], "none"); + assert!(parsed["route"].is_null()); + } +} diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index be25e383..bd077c2f 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -3,6 +3,7 @@ use brightstaff::handlers::agents::orchestrator::agent_chat; use brightstaff::handlers::function_calling::function_calling_chat_handler; use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::models::list_models; +use brightstaff::handlers::routing_service::routing_decision; use brightstaff::router::llm::RouterService; use brightstaff::router::orchestrator::OrchestratorService; use brightstaff::state::memory::MemoryConversationalStorage; @@ -221,6 +222,24 @@ async fn route( } } + // --- Routing decision routes (/routing/...) --- + if let Some(stripped) = path.strip_prefix("/routing") { + let stripped = stripped.to_string(); + if matches!( + stripped.as_str(), + CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH + ) { + return routing_decision( + req, + Arc::clone(&state.router_service), + stripped, + Arc::clone(&state.span_attributes), + ) + .with_context(parent_cx) + .await; + } + } + // --- Standard routes --- match (req.method(), path.as_str()) { (&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH) => { diff --git a/crates/brightstaff/src/state/mod.rs b/crates/brightstaff/src/state/mod.rs index da8c4077..aefebb3f 100644 --- a/crates/brightstaff/src/state/mod.rs +++ b/crates/brightstaff/src/state/mod.rs @@ -112,6 +112,7 @@ pub fn extract_input_items(input: &InputParam) -> Vec { }]), })] } + InputParam::SingleItem(item) => vec![item.clone()], InputParam::Items(items) => items.clone(), } } @@ -128,3 +129,101 @@ pub async fn retrieve_and_combine_input( let combined_input = storage.merge(&prev_state, current_input); Ok(combined_input) } + +#[cfg(test)] +mod tests { + use super::extract_input_items; + use hermesllm::apis::openai_responses::{ + InputContent, InputItem, InputMessage, InputParam, MessageContent, MessageRole, + }; + + #[test] + fn test_extract_input_items_converts_text_to_user_message_item() { + let extracted = extract_input_items(&InputParam::Text("hello world".to_string())); + assert_eq!(extracted.len(), 1); + + let InputItem::Message(message) = &extracted[0] else { + panic!("expected InputItem::Message"); + }; + assert!(matches!(message.role, MessageRole::User)); + + let MessageContent::Items(items) = &message.content else { + panic!("expected MessageContent::Items"); + }; + assert_eq!(items.len(), 1); + + let InputContent::InputText { text } = &items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(text, "hello world"); + } + + #[test] + fn test_extract_input_items_preserves_single_item() { + let item = InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![InputContent::InputText { + text: "assistant note".to_string(), + }]), + }); + + let extracted = extract_input_items(&InputParam::SingleItem(item.clone())); + assert_eq!(extracted.len(), 1); + let InputItem::Message(message) = &extracted[0] else { + panic!("expected InputItem::Message"); + }; + assert!(matches!(message.role, MessageRole::Assistant)); + let MessageContent::Items(items) = &message.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text } = &items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(text, "assistant note"); + } + + #[test] + fn test_extract_input_items_preserves_items_list() { + let items = vec![ + InputItem::Message(InputMessage { + role: MessageRole::User, + content: MessageContent::Items(vec![InputContent::InputText { + text: "first".to_string(), + }]), + }), + InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: MessageContent::Items(vec![InputContent::InputText { + text: "second".to_string(), + }]), + }), + ]; + + let extracted = extract_input_items(&InputParam::Items(items.clone())); + assert_eq!(extracted.len(), items.len()); + + let InputItem::Message(first) = &extracted[0] else { + panic!("expected first item to be message"); + }; + assert!(matches!(first.role, MessageRole::User)); + let MessageContent::Items(first_items) = &first.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text: first_text } = &first_items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(first_text, "first"); + + let InputItem::Message(second) = &extracted[1] else { + panic!("expected second item to be message"); + }; + assert!(matches!(second.role, MessageRole::Assistant)); + let MessageContent::Items(second_items) = &second.content else { + panic!("expected MessageContent::Items"); + }; + let InputContent::InputText { text: second_text } = &second_items[0] else { + panic!("expected InputContent::InputText"); + }; + assert_eq!(second_text, "second"); + } +} diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index 53eee442..33f55b29 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -108,7 +108,7 @@ pub struct ChatCompletionsRequest { pub top_p: Option, pub top_logprobs: Option, pub user: Option, - // pub web_search: Option, // GOOD FIRST ISSUE: Future support for web search + pub web_search_options: Option, // VLLM-specific parameters (used by Arch-Function) pub top_k: Option, diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 65f4dfa0..eac8a452 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -116,6 +116,8 @@ pub enum InputParam { Text(String), /// Array of input items (messages, references, outputs, etc.) Items(Vec), + /// Single input item (some clients send object instead of array) + SingleItem(InputItem), } /// Input item - can be a message, item reference, function call output, etc. @@ -130,12 +132,20 @@ pub enum InputItem { item_type: String, id: String, }, + /// Function call emitted by model in prior turn + FunctionCall { + #[serde(rename = "type")] + item_type: String, + name: String, + arguments: String, + call_id: String, + }, /// Function call output FunctionCallOutput { #[serde(rename = "type")] item_type: String, call_id: String, - output: String, + output: serde_json::Value, }, } @@ -166,6 +176,7 @@ pub enum MessageRole { Assistant, System, Developer, + Tool, } /// Input content types @@ -173,6 +184,7 @@ pub enum MessageRole { #[serde(tag = "type", rename_all = "snake_case")] pub enum InputContent { /// Text input + #[serde(rename = "input_text", alias = "text", alias = "output_text")] InputText { text: String }, /// Image input via URL InputImage { @@ -180,6 +192,7 @@ pub enum InputContent { detail: Option, }, /// File input via URL + #[serde(rename = "input_file", alias = "file")] InputFile { file_url: String }, /// Audio input InputAudio { @@ -207,10 +220,11 @@ pub struct AudioConfig { } /// Text configuration +#[skip_serializing_none] #[derive(Debug, Clone, Serialize, Deserialize)] pub struct TextConfig { /// Text format configuration - pub format: TextFormat, + pub format: Option, } /// Text format @@ -285,6 +299,7 @@ pub enum Tool { filters: Option, }, /// Web search tool + #[serde(rename = "web_search", alias = "web_search_preview")] WebSearchPreview { domains: Option>, search_context_size: Option, @@ -298,6 +313,12 @@ pub enum Tool { display_height_px: Option, display_number: Option, }, + /// Custom tool (provider/SDK-specific tool contract) + Custom { + name: Option, + description: Option, + format: Option, + }, } /// Ranking options for file search @@ -1015,6 +1036,30 @@ pub struct ListInputItemsResponse { // ProviderRequest Implementation // ============================================================================ +fn append_input_content_text(buffer: &mut String, content: &InputContent) { + match content { + InputContent::InputText { text } => buffer.push_str(text), + InputContent::InputImage { .. } => buffer.push_str("[Image]"), + InputContent::InputFile { .. } => buffer.push_str("[File]"), + InputContent::InputAudio { .. } => buffer.push_str("[Audio]"), + } +} + +fn append_content_items_text(buffer: &mut String, content_items: &[InputContent]) { + for content in content_items { + // Preserve existing behavior: each content item is prefixed with a space. + buffer.push(' '); + append_input_content_text(buffer, content); + } +} + +fn append_message_content_text(buffer: &mut String, content: &MessageContent) { + match content { + MessageContent::Text(text) => buffer.push_str(text), + MessageContent::Items(content_items) => append_content_items_text(buffer, content_items), + } +} + impl ProviderRequest for ResponsesAPIRequest { fn model(&self) -> &str { &self.model @@ -1031,36 +1076,27 @@ impl ProviderRequest for ResponsesAPIRequest { fn extract_messages_text(&self) -> String { match &self.input { InputParam::Text(text) => text.clone(), - InputParam::Items(items) => { - items.iter().fold(String::new(), |acc, item| { - match item { - InputItem::Message(msg) => { - let content_text = match &msg.content { - MessageContent::Text(text) => text.clone(), - MessageContent::Items(content_items) => { - content_items.iter().fold(String::new(), |acc, content| { - acc + " " - + &match content { - InputContent::InputText { text } => text.clone(), - InputContent::InputImage { .. } => { - "[Image]".to_string() - } - InputContent::InputFile { .. } => { - "[File]".to_string() - } - InputContent::InputAudio { .. } => { - "[Audio]".to_string() - } - } - }) - } - }; - acc + " " + &content_text - } - // Skip non-message items (references, outputs, etc.) - _ => acc, + InputParam::SingleItem(item) => { + // Normalize single-item input for extraction behavior parity. + match item { + InputItem::Message(msg) => { + let mut extracted = String::new(); + append_message_content_text(&mut extracted, &msg.content); + extracted } - }) + _ => String::new(), + } + } + InputParam::Items(items) => { + let mut extracted = String::new(); + for item in items { + if let InputItem::Message(msg) = item { + // Preserve existing behavior: each message is prefixed with a space. + extracted.push(' '); + append_message_content_text(&mut extracted, &msg.content); + } + } + extracted } } } @@ -1068,6 +1104,20 @@ impl ProviderRequest for ResponsesAPIRequest { fn get_recent_user_message(&self) -> Option { match &self.input { InputParam::Text(text) => Some(text.clone()), + InputParam::SingleItem(item) => match item { + InputItem::Message(msg) if matches!(msg.role, MessageRole::User) => { + match &msg.content { + MessageContent::Text(text) => Some(text.clone()), + MessageContent::Items(content_items) => { + content_items.iter().find_map(|content| match content { + InputContent::InputText { text } => Some(text.clone()), + _ => None, + }) + } + } + } + _ => None, + }, InputParam::Items(items) => { items.iter().rev().find_map(|item| { match item { @@ -1097,6 +1147,9 @@ impl ProviderRequest for ResponsesAPIRequest { .iter() .filter_map(|tool| match tool { Tool::Function { name, .. } => Some(name.clone()), + Tool::Custom { + name: Some(name), .. + } => Some(name.clone()), // Other tool types don't have user-defined names _ => None, }) @@ -1366,6 +1419,7 @@ impl crate::providers::streaming_response::ProviderStreamResponse for ResponsesA #[cfg(test)] mod tests { use super::*; + use serde_json::json; #[test] fn test_response_output_text_delta_deserialization() { @@ -1506,4 +1560,87 @@ mod tests { _ => panic!("Expected ResponseCompleted event"), } } + + #[test] + fn test_request_deserializes_custom_tool() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "apply the patch", + "tools": [ + { + "type": "custom", + "name": "run_patch", + "description": "Apply patch text", + "format": { + "kind": "patch", + "version": "v1" + } + } + ] + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + let tools = parsed.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + + match &tools[0] { + Tool::Custom { + name, + description, + format, + } => { + assert_eq!(name.as_deref(), Some("run_patch")); + assert_eq!(description.as_deref(), Some("Apply patch text")); + assert!(format.is_some()); + } + _ => panic!("expected custom tool"), + } + } + + #[test] + fn test_request_deserializes_web_search_tool_alias() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "find repository info", + "tools": [ + { + "type": "web_search", + "domains": ["github.com"], + "search_context_size": "medium" + } + ] + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + let tools = parsed.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + + match &tools[0] { + Tool::WebSearchPreview { + domains, + search_context_size, + .. + } => { + assert_eq!(domains.as_ref().map(Vec::len), Some(1)); + assert_eq!(search_context_size.as_deref(), Some("medium")); + } + _ => panic!("expected web search preview tool"), + } + } + + #[test] + fn test_request_deserializes_text_config_without_format() { + let request = json!({ + "model": "gpt-5.3-codex", + "input": "hello", + "text": {} + }); + + let bytes = serde_json::to_vec(&request).unwrap(); + let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap(); + assert!(parsed.text.is_some()); + assert!(parsed.text.unwrap().format.is_none()); + } } diff --git a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs index 2aeb34ac..92589ccf 100644 --- a/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs +++ b/crates/hermesllm/src/apis/streaming_shapes/responses_api_streaming_buffer.rs @@ -74,6 +74,7 @@ pub struct ResponsesAPIStreamBuffer { /// Lifecycle state flags created_emitted: bool, in_progress_emitted: bool, + finalized: bool, /// Track which output items we've added output_items_added: HashMap, // output_index -> item_id @@ -109,6 +110,7 @@ impl ResponsesAPIStreamBuffer { upstream_response_metadata: None, created_emitted: false, in_progress_emitted: false, + finalized: false, output_items_added: HashMap::new(), text_content: HashMap::new(), function_arguments: HashMap::new(), @@ -236,7 +238,7 @@ impl ResponsesAPIStreamBuffer { }), store: Some(true), text: Some(TextConfig { - format: TextFormat::Text, + format: Some(TextFormat::Text), }), audio: None, modalities: None, @@ -255,8 +257,38 @@ impl ResponsesAPIStreamBuffer { /// Finalize the response by emitting all *.done events and response.completed. /// Call this when the stream is complete (after seeing [DONE] or end_of_stream). pub fn finalize(&mut self) { + // Idempotent finalize: avoid duplicate response.completed loops. + if self.finalized { + return; + } + self.finalized = true; + let mut events = Vec::new(); + // Ensure lifecycle prelude is emitted even if finalize is triggered + // by finish_reason before any prior delta was processed. + if !self.created_emitted { + if self.response_id.is_none() { + self.response_id = Some(format!( + "resp_{}", + uuid::Uuid::new_v4().to_string().replace("-", "") + )); + self.created_at = Some( + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_secs() as i64, + ); + self.model = Some("unknown".to_string()); + } + events.push(self.create_response_created_event()); + self.created_emitted = true; + } + if !self.in_progress_emitted { + events.push(self.create_response_in_progress_event()); + self.in_progress_emitted = true; + } + // Emit done events for all accumulated content // Text content done events @@ -443,6 +475,12 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer { } }; + // Explicit completion marker from transform layer. + if matches!(stream_event.as_ref(), ResponsesAPIStreamEvent::Done { .. }) { + self.finalize(); + return; + } + let mut events = Vec::new(); // Capture upstream metadata from ResponseCreated or ResponseInProgress if present @@ -789,4 +827,30 @@ mod tests { println!("✓ NO completion events (partial stream, no [DONE])"); println!("✓ Arguments accumulated: '{{\"location\":\"'\n"); } + + #[test] + fn test_finish_reason_without_done_still_finalizes_once() { + let raw_input = r#"data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"},"finish_reason":null}]} + +data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#; + + let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions); + + let stream_iter = SseStreamIter::try_from(raw_input.as_bytes()).unwrap(); + let mut buffer = ResponsesAPIStreamBuffer::new(); + + for raw_event in stream_iter { + let transformed_event = + SseEvent::try_from((raw_event, &client_api, &upstream_api)).unwrap(); + buffer.add_transformed_event(transformed_event); + } + + let output = String::from_utf8_lossy(&buffer.to_bytes()).to_string(); + let completed_count = output.matches("event: response.completed").count(); + assert_eq!( + completed_count, 1, + "response.completed should be emitted exactly once" + ); + } } diff --git a/crates/hermesllm/src/clients/endpoints.rs b/crates/hermesllm/src/clients/endpoints.rs index eff96cc5..23e14604 100644 --- a/crates/hermesllm/src/clients/endpoints.rs +++ b/crates/hermesllm/src/clients/endpoints.rs @@ -184,8 +184,8 @@ impl SupportedAPIsFromClient { SupportedAPIsFromClient::OpenAIResponsesAPI(_) => { // For Responses API, check if provider supports it, otherwise translate to chat/completions match provider_id { - // OpenAI and compatible providers that support /v1/responses - ProviderId::OpenAI => route_by_provider("/responses"), + // Providers that support /v1/responses natively + ProviderId::OpenAI | ProviderId::XAI => route_by_provider("/responses"), // All other providers: translate to /chat/completions _ => route_by_provider("/chat/completions"), } @@ -654,4 +654,19 @@ mod tests { "/custom/azure/path/gpt-4-deployment/chat/completions?api-version=2025-01-01-preview" ); } + + #[test] + fn test_responses_api_targets_xai_native_responses_endpoint() { + let api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + assert_eq!( + api.target_endpoint_for_provider( + &ProviderId::XAI, + "/v1/responses", + "grok-4-1-fast-reasoning", + false, + None + ), + "/v1/responses" + ); + } } diff --git a/crates/hermesllm/src/providers/id.rs b/crates/hermesllm/src/providers/id.rs index fff73f15..11008711 100644 --- a/crates/hermesllm/src/providers/id.rs +++ b/crates/hermesllm/src/providers/id.rs @@ -166,10 +166,11 @@ impl ProviderId { SupportedAPIsFromClient::OpenAIChatCompletions(_), ) => SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), - // OpenAI Responses API - only OpenAI supports this - (ProviderId::OpenAI, SupportedAPIsFromClient::OpenAIResponsesAPI(_)) => { - SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses) - } + // OpenAI Responses API - OpenAI and xAI support this natively + ( + ProviderId::OpenAI | ProviderId::XAI, + SupportedAPIsFromClient::OpenAIResponsesAPI(_), + ) => SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses), // Amazon Bedrock natively supports Bedrock APIs (ProviderId::AmazonBedrock, SupportedAPIsFromClient::OpenAIChatCompletions(_)) => { @@ -328,4 +329,16 @@ mod tests { "AmazonBedrock should have models (mapped to amazon)" ); } + + #[test] + fn test_xai_uses_responses_api_for_responses_clients() { + use crate::clients::endpoints::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; + + let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses); + let upstream = ProviderId::XAI.compatible_api_for_client(&client_api, false); + assert!(matches!( + upstream, + SupportedUpstreamAPIs::OpenAIResponsesAPI(OpenAIApi::Responses) + )); + } } diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index e97e8a68..92688133 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -5,6 +5,7 @@ use crate::apis::amazon_bedrock::{ConverseRequest, ConverseStreamRequest}; use crate::apis::openai_responses::ResponsesAPIRequest; use crate::clients::endpoints::SupportedAPIsFromClient; use crate::clients::endpoints::SupportedUpstreamAPIs; +use crate::ProviderId; use serde_json::Value; use std::collections::HashMap; @@ -70,6 +71,25 @@ impl ProviderRequestType { Self::ResponsesAPIRequest(r) => r.set_messages(messages), } } + + /// Apply provider-specific request normalization before sending upstream. + pub fn normalize_for_upstream( + &mut self, + provider_id: ProviderId, + upstream_api: &SupportedUpstreamAPIs, + ) { + if provider_id == ProviderId::XAI + && matches!( + upstream_api, + SupportedUpstreamAPIs::OpenAIChatCompletions(_) + ) + { + if let Self::ChatCompletionsRequest(req) = self { + // xAI's legacy live-search shape is deprecated on chat/completions. + req.web_search_options = None; + } + } + } } impl ProviderRequest for ProviderRequestType { @@ -787,6 +807,62 @@ mod tests { } } + #[test] + fn test_normalize_for_upstream_xai_clears_chat_web_search_options() { + use crate::apis::openai::{Message, MessageContent, OpenAIApi, Role}; + + let mut request = ProviderRequestType::ChatCompletionsRequest(ChatCompletionsRequest { + model: "grok-4".to_string(), + messages: vec![Message { + role: Role::User, + content: Some(MessageContent::Text("hello".to_string())), + name: None, + tool_calls: None, + tool_call_id: None, + }], + web_search_options: Some(serde_json::json!({"search_context_size":"medium"})), + ..Default::default() + }); + + request.normalize_for_upstream( + ProviderId::XAI, + &SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), + ); + + let ProviderRequestType::ChatCompletionsRequest(req) = request else { + panic!("expected chat request"); + }; + assert!(req.web_search_options.is_none()); + } + + #[test] + fn test_normalize_for_upstream_non_xai_keeps_chat_web_search_options() { + use crate::apis::openai::{Message, MessageContent, OpenAIApi, Role}; + + let mut request = ProviderRequestType::ChatCompletionsRequest(ChatCompletionsRequest { + model: "gpt-4o".to_string(), + messages: vec![Message { + role: Role::User, + content: Some(MessageContent::Text("hello".to_string())), + name: None, + tool_calls: None, + tool_call_id: None, + }], + web_search_options: Some(serde_json::json!({"search_context_size":"medium"})), + ..Default::default() + }); + + request.normalize_for_upstream( + ProviderId::OpenAI, + &SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions), + ); + + let ProviderRequestType::ChatCompletionsRequest(req) = request else { + panic!("expected chat request"); + }; + assert!(req.web_search_options.is_some()); + } + #[test] fn test_responses_api_to_anthropic_messages_conversion() { use crate::apis::anthropic::AnthropicApi::Messages; diff --git a/crates/hermesllm/src/transforms/request/from_openai.rs b/crates/hermesllm/src/transforms/request/from_openai.rs index ddc3b1ca..f2e8ab0d 100644 --- a/crates/hermesllm/src/transforms/request/from_openai.rs +++ b/crates/hermesllm/src/transforms/request/from_openai.rs @@ -10,7 +10,8 @@ use crate::apis::anthropic::{ ToolResultContent, }; use crate::apis::openai::{ - ChatCompletionsRequest, Message, MessageContent, Role, Tool, ToolChoice, ToolChoiceType, + ChatCompletionsRequest, FunctionCall as OpenAIFunctionCall, Message, MessageContent, Role, + Tool, ToolCall as OpenAIToolCall, ToolChoice, ToolChoiceType, }; use crate::apis::openai_responses::{ @@ -65,6 +66,14 @@ impl TryFrom for Vec { Ok(messages) } + InputParam::SingleItem(item) => { + // Some clients send a single object instead of an array. + let nested = ResponsesInputConverter { + input: InputParam::Items(vec![item]), + instructions: converter.instructions, + }; + Vec::::try_from(nested) + } InputParam::Items(items) => { // Convert input items to messages let mut converted_messages = Vec::new(); @@ -82,82 +91,145 @@ impl TryFrom for Vec { // Convert each input item for item in items { - if let InputItem::Message(input_msg) = item { - let role = match input_msg.role { - MessageRole::User => Role::User, - MessageRole::Assistant => Role::Assistant, - MessageRole::System => Role::System, - MessageRole::Developer => Role::System, // Map developer to system - }; + match item { + InputItem::Message(input_msg) => { + let role = match input_msg.role { + MessageRole::User => Role::User, + MessageRole::Assistant => Role::Assistant, + MessageRole::System => Role::System, + MessageRole::Developer => Role::System, // Map developer to system + MessageRole::Tool => Role::Tool, + }; - // Convert content based on MessageContent type - let content = match &input_msg.content { - crate::apis::openai_responses::MessageContent::Text(text) => { - // Simple text content - MessageContent::Text(text.clone()) - } - crate::apis::openai_responses::MessageContent::Items(content_items) => { - // Check if it's a single text item (can use simple text format) - if content_items.len() == 1 { - if let InputContent::InputText { text } = &content_items[0] { - MessageContent::Text(text.clone()) + // Convert content based on MessageContent type + let content = match &input_msg.content { + crate::apis::openai_responses::MessageContent::Text(text) => { + // Simple text content + MessageContent::Text(text.clone()) + } + crate::apis::openai_responses::MessageContent::Items( + content_items, + ) => { + // Check if it's a single text item (can use simple text format) + if content_items.len() == 1 { + if let InputContent::InputText { text } = &content_items[0] + { + MessageContent::Text(text.clone()) + } else { + // Single non-text item - use parts format + MessageContent::Parts( + content_items + .iter() + .filter_map(|c| match c { + InputContent::InputText { text } => { + Some(crate::apis::openai::ContentPart::Text { + text: text.clone(), + }) + } + InputContent::InputImage { image_url, .. } => { + Some(crate::apis::openai::ContentPart::ImageUrl { + image_url: crate::apis::openai::ImageUrl { + url: image_url.clone(), + detail: None, + }, + }) + } + InputContent::InputFile { .. } => None, // Skip files for now + InputContent::InputAudio { .. } => None, // Skip audio for now + }) + .collect(), + ) + } } else { - // Single non-text item - use parts format + // Multiple content items - convert to parts MessageContent::Parts( - content_items.iter() + content_items + .iter() .filter_map(|c| match c { InputContent::InputText { text } => { - Some(crate::apis::openai::ContentPart::Text { text: text.clone() }) + Some(crate::apis::openai::ContentPart::Text { + text: text.clone(), + }) } InputContent::InputImage { image_url, .. } => { Some(crate::apis::openai::ContentPart::ImageUrl { image_url: crate::apis::openai::ImageUrl { url: image_url.clone(), detail: None, - } + }, }) } InputContent::InputFile { .. } => None, // Skip files for now InputContent::InputAudio { .. } => None, // Skip audio for now }) - .collect() + .collect(), ) } - } else { - // Multiple content items - convert to parts - MessageContent::Parts( - content_items - .iter() - .filter_map(|c| match c { - InputContent::InputText { text } => { - Some(crate::apis::openai::ContentPart::Text { - text: text.clone(), - }) - } - InputContent::InputImage { image_url, .. } => Some( - crate::apis::openai::ContentPart::ImageUrl { - image_url: crate::apis::openai::ImageUrl { - url: image_url.clone(), - detail: None, - }, - }, - ), - InputContent::InputFile { .. } => None, // Skip files for now - InputContent::InputAudio { .. } => None, // Skip audio for now - }) - .collect(), - ) + } + }; + + converted_messages.push(Message { + role, + content: Some(content), + name: None, + tool_call_id: None, + tool_calls: None, + }); + } + InputItem::FunctionCallOutput { + item_type: _, + call_id, + output, + } => { + // Preserve tool result so upstream models do not re-issue the same tool call. + let output_text = match output { + serde_json::Value::String(s) => s.clone(), + other => serde_json::to_string(&other).unwrap_or_default(), + }; + converted_messages.push(Message { + role: Role::Tool, + content: Some(MessageContent::Text(output_text)), + name: None, + tool_call_id: Some(call_id), + tool_calls: None, + }); + } + InputItem::FunctionCall { + item_type: _, + name, + arguments, + call_id, + } => { + let tool_call = OpenAIToolCall { + id: call_id, + call_type: "function".to_string(), + function: OpenAIFunctionCall { name, arguments }, + }; + + // Prefer attaching tool_calls to the preceding assistant message when present. + if let Some(last) = converted_messages.last_mut() { + if matches!(last.role, Role::Assistant) { + if let Some(existing) = &mut last.tool_calls { + existing.push(tool_call); + } else { + last.tool_calls = Some(vec![tool_call]); + } + continue; } } - }; - converted_messages.push(Message { - role, - content: Some(content), - name: None, - tool_call_id: None, - tool_calls: None, - }); + converted_messages.push(Message { + role: Role::Assistant, + content: None, + name: None, + tool_call_id: None, + tool_calls: Some(vec![tool_call]), + }); + } + InputItem::ItemReference { .. } => { + // Item references/unknown entries are metadata-like and can be skipped + // for chat-completions conversion. + } } } @@ -397,6 +469,170 @@ impl TryFrom for ChatCompletionsRequest { type Error = TransformError; fn try_from(req: ResponsesAPIRequest) -> Result { + fn normalize_function_parameters( + parameters: Option, + fallback_extra: Option, + ) -> serde_json::Value { + // ChatCompletions function tools require JSON Schema with top-level type=object. + let mut base = serde_json::json!({ + "type": "object", + "properties": {}, + }); + + if let Some(serde_json::Value::Object(mut obj)) = parameters { + // Enforce a valid object schema shape regardless of upstream tool format. + obj.insert( + "type".to_string(), + serde_json::Value::String("object".to_string()), + ); + if !obj.contains_key("properties") { + obj.insert( + "properties".to_string(), + serde_json::Value::Object(serde_json::Map::new()), + ); + } + base = serde_json::Value::Object(obj); + } + + if let Some(extra) = fallback_extra { + if let serde_json::Value::Object(ref mut map) = base { + map.insert("x-custom-format".to_string(), extra); + } + } + + base + } + + let mut converted_chat_tools: Vec = Vec::new(); + let mut web_search_options: Option = None; + + if let Some(tools) = req.tools.clone() { + for (idx, tool) in tools.into_iter().enumerate() { + match tool { + ResponsesTool::Function { + name, + description, + parameters, + strict, + } => converted_chat_tools.push(Tool { + tool_type: "function".to_string(), + function: crate::apis::openai::Function { + name, + description, + parameters: normalize_function_parameters(parameters, None), + strict, + }, + }), + ResponsesTool::WebSearchPreview { + search_context_size, + user_location, + .. + } => { + if web_search_options.is_none() { + let user_location_value = user_location.map(|loc| { + let mut approx = serde_json::Map::new(); + if let Some(city) = loc.city { + approx.insert( + "city".to_string(), + serde_json::Value::String(city), + ); + } + if let Some(country) = loc.country { + approx.insert( + "country".to_string(), + serde_json::Value::String(country), + ); + } + if let Some(region) = loc.region { + approx.insert( + "region".to_string(), + serde_json::Value::String(region), + ); + } + if let Some(timezone) = loc.timezone { + approx.insert( + "timezone".to_string(), + serde_json::Value::String(timezone), + ); + } + + serde_json::json!({ + "type": loc.location_type, + "approximate": serde_json::Value::Object(approx), + }) + }); + + let mut web_search = serde_json::Map::new(); + if let Some(size) = search_context_size { + web_search.insert( + "search_context_size".to_string(), + serde_json::Value::String(size), + ); + } + if let Some(location) = user_location_value { + web_search.insert("user_location".to_string(), location); + } + web_search_options = Some(serde_json::Value::Object(web_search)); + } + } + ResponsesTool::Custom { + name, + description, + format, + } => { + // Custom tools do not have a strict ChatCompletions equivalent for all + // providers. Map them to a permissive function tool for compatibility. + let tool_name = name.unwrap_or_else(|| format!("custom_tool_{}", idx + 1)); + let parameters = normalize_function_parameters( + Some(serde_json::json!({ + "type": "object", + "properties": { + "input": { "type": "string" } + }, + "required": ["input"], + "additionalProperties": true, + })), + format, + ); + + converted_chat_tools.push(Tool { + tool_type: "function".to_string(), + function: crate::apis::openai::Function { + name: tool_name, + description, + parameters, + strict: Some(false), + }, + }); + } + ResponsesTool::FileSearch { .. } => { + return Err(TransformError::UnsupportedConversion( + "FileSearch tool is not supported in ChatCompletions API. Only function/custom/web search tools are supported in this conversion." + .to_string(), + )); + } + ResponsesTool::CodeInterpreter => { + return Err(TransformError::UnsupportedConversion( + "CodeInterpreter tool is not supported in ChatCompletions API conversion." + .to_string(), + )); + } + ResponsesTool::Computer { .. } => { + return Err(TransformError::UnsupportedConversion( + "Computer tool is not supported in ChatCompletions API conversion." + .to_string(), + )); + } + } + } + } + + let tools = if converted_chat_tools.is_empty() { + None + } else { + Some(converted_chat_tools) + }; + // Convert input to messages using the shared converter let converter = ResponsesInputConverter { input: req.input, @@ -418,57 +654,24 @@ impl TryFrom for ChatCompletionsRequest { service_tier: req.service_tier, top_logprobs: req.top_logprobs.map(|t| t as u32), modalities: req.modalities.map(|mods| { - mods.into_iter().map(|m| { - match m { + mods.into_iter() + .map(|m| match m { Modality::Text => "text".to_string(), Modality::Audio => "audio".to_string(), - } - }).collect() + }) + .collect() }), - stream_options: req.stream_options.map(|opts| { - crate::apis::openai::StreamOptions { + stream_options: req + .stream_options + .map(|opts| crate::apis::openai::StreamOptions { include_usage: opts.include_usage, - } + }), + reasoning_effort: req.reasoning_effort.map(|effort| match effort { + ReasoningEffort::Low => "low".to_string(), + ReasoningEffort::Medium => "medium".to_string(), + ReasoningEffort::High => "high".to_string(), }), - reasoning_effort: req.reasoning_effort.map(|effort| { - match effort { - ReasoningEffort::Low => "low".to_string(), - ReasoningEffort::Medium => "medium".to_string(), - ReasoningEffort::High => "high".to_string(), - } - }), - tools: req.tools.map(|tools| { - tools.into_iter().map(|tool| { - - // Only convert Function tools - other types are not supported in ChatCompletions - match tool { - ResponsesTool::Function { name, description, parameters, strict } => Ok(Tool { - tool_type: "function".to_string(), - function: crate::apis::openai::Function { - name, - description, - parameters: parameters.unwrap_or_else(|| serde_json::json!({ - "type": "object", - "properties": {} - })), - strict, - } - }), - ResponsesTool::FileSearch { .. } => Err(TransformError::UnsupportedConversion( - "FileSearch tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::WebSearchPreview { .. } => Err(TransformError::UnsupportedConversion( - "WebSearchPreview tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::CodeInterpreter => Err(TransformError::UnsupportedConversion( - "CodeInterpreter tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - ResponsesTool::Computer { .. } => Err(TransformError::UnsupportedConversion( - "Computer tool is not supported in ChatCompletions API. Only function tools are supported.".to_string() - )), - } - }).collect::, _>>() - }).transpose()?, + tools, tool_choice: req.tool_choice.map(|choice| { match choice { ResponsesToolChoice::String(s) => { @@ -481,11 +684,14 @@ impl TryFrom for ChatCompletionsRequest { } ResponsesToolChoice::Named { function, .. } => ToolChoice::Function { choice_type: "function".to_string(), - function: crate::apis::openai::FunctionChoice { name: function.name } - } + function: crate::apis::openai::FunctionChoice { + name: function.name, + }, + }, } }), parallel_tool_calls: req.parallel_tool_calls, + web_search_options, ..Default::default() }) } @@ -1027,4 +1233,235 @@ mod tests { panic!("Expected text content block"); } } + + #[test] + fn test_responses_custom_tool_maps_to_function_tool_for_chat_completions() { + use crate::apis::openai_responses::{ + InputParam, ResponsesAPIRequest, Tool as ResponsesTool, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Text("use custom tool".to_string()), + tools: Some(vec![ResponsesTool::Custom { + name: Some("run_patch".to_string()), + description: Some("Apply structured patch".to_string()), + format: Some(serde_json::json!({ + "kind": "patch", + "version": "v1" + })), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + let tools = converted.tools.expect("tools should be present"); + assert_eq!(tools.len(), 1); + assert_eq!(tools[0].tool_type, "function"); + assert_eq!(tools[0].function.name, "run_patch"); + assert_eq!( + tools[0].function.description.as_deref(), + Some("Apply structured patch") + ); + } + + #[test] + fn test_responses_web_search_maps_to_chat_web_search_options() { + use crate::apis::openai_responses::{ + InputParam, ResponsesAPIRequest, Tool as ResponsesTool, UserLocation, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Text("find project docs".to_string()), + tools: Some(vec![ResponsesTool::WebSearchPreview { + domains: Some(vec!["docs.planoai.dev".to_string()]), + search_context_size: Some("medium".to_string()), + user_location: Some(UserLocation { + location_type: "approximate".to_string(), + city: Some("San Francisco".to_string()), + country: Some("US".to_string()), + region: Some("CA".to_string()), + timezone: Some("America/Los_Angeles".to_string()), + }), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert!(converted.web_search_options.is_some()); + } + + #[test] + fn test_responses_function_call_output_maps_to_tool_message() { + use crate::apis::openai_responses::{ + InputItem, InputParam, ResponsesAPIRequest, Tool as ResponsesTool, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Items(vec![InputItem::FunctionCallOutput { + item_type: "function_call_output".to_string(), + call_id: "call_123".to_string(), + output: serde_json::json!({"status":"ok","stdout":"hello"}), + }]), + tools: Some(vec![ResponsesTool::Function { + name: "exec_command".to_string(), + description: Some("Execute a shell command".to_string()), + parameters: Some(serde_json::json!({ + "type": "object", + "properties": { + "cmd": { "type": "string" } + }, + "required": ["cmd"] + })), + strict: Some(false), + }]), + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert_eq!(converted.messages.len(), 1); + assert!(matches!(converted.messages[0].role, Role::Tool)); + assert_eq!( + converted.messages[0].tool_call_id.as_deref(), + Some("call_123") + ); + } + + #[test] + fn test_responses_function_call_and_output_preserve_call_id_link() { + use crate::apis::openai_responses::{ + InputItem, InputMessage, MessageContent as ResponsesMessageContent, MessageRole, + ResponsesAPIRequest, + }; + + let req = ResponsesAPIRequest { + model: "gpt-5.3-codex".to_string(), + input: InputParam::Items(vec![ + InputItem::Message(InputMessage { + role: MessageRole::Assistant, + content: ResponsesMessageContent::Items(vec![]), + }), + InputItem::FunctionCall { + item_type: "function_call".to_string(), + name: "exec_command".to_string(), + arguments: "{\"cmd\":\"pwd\"}".to_string(), + call_id: "toolu_abc123".to_string(), + }, + InputItem::FunctionCallOutput { + item_type: "function_call_output".to_string(), + call_id: "toolu_abc123".to_string(), + output: serde_json::Value::String("ok".to_string()), + }, + ]), + tools: None, + include: None, + parallel_tool_calls: None, + store: None, + instructions: None, + stream: None, + stream_options: None, + conversation: None, + tool_choice: None, + max_output_tokens: None, + temperature: None, + top_p: None, + metadata: None, + previous_response_id: None, + modalities: None, + audio: None, + text: None, + reasoning_effort: None, + truncation: None, + user: None, + max_tool_calls: None, + service_tier: None, + background: None, + top_logprobs: None, + }; + + let converted = ChatCompletionsRequest::try_from(req).expect("conversion should succeed"); + assert_eq!(converted.messages.len(), 2); + + assert!(matches!(converted.messages[0].role, Role::Assistant)); + let tool_calls = converted.messages[0] + .tool_calls + .as_ref() + .expect("assistant tool_calls should be present"); + assert_eq!(tool_calls.len(), 1); + assert_eq!(tool_calls[0].id, "toolu_abc123"); + + assert!(matches!(converted.messages[1].role, Role::Tool)); + assert_eq!( + converted.messages[1].tool_call_id.as_deref(), + Some("toolu_abc123") + ); + } } diff --git a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs index 328317bc..4aa719af 100644 --- a/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs +++ b/crates/hermesllm/src/transforms/response_streaming/to_openai_streaming.rs @@ -512,19 +512,12 @@ impl TryFrom for ResponsesAPIStreamEvent { } } - // Handle finish_reason - this is a completion signal - // Return an empty delta that the buffer can use to detect completion + // Handle finish_reason - this is a completion signal. + // Emit an explicit Done marker so the buffering layer can finalize + // even if an upstream [DONE] marker is missing/delayed. if choice.finish_reason.is_some() { - // Return a minimal text delta to signal completion - // The buffer will handle the finish_reason and generate response.completed - return Ok(ResponsesAPIStreamEvent::ResponseOutputTextDelta { - item_id: "".to_string(), // Buffer will fill this - output_index: choice.index as i32, - content_index: 0, - delta: "".to_string(), // Empty delta signals completion - logprobs: vec![], - obfuscation: None, - sequence_number: 0, // Buffer will fill this + return Ok(ResponsesAPIStreamEvent::Done { + sequence_number: 0, // Buffer will assign final sequence }); } diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 547ba166..7a353bcb 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -1046,7 +1046,8 @@ impl HttpContext for StreamContext { ); match ProviderRequestType::try_from((deserialized_client_request, upstream)) { - Ok(request) => { + Ok(mut request) => { + request.normalize_for_upstream(self.get_provider_id(), upstream); debug!( "request_id={}: upstream request payload: {}", self.request_identifier(), diff --git a/demos/README.md b/demos/README.md index a2613454..6e467a33 100644 --- a/demos/README.md +++ b/demos/README.md @@ -16,6 +16,7 @@ This directory contains demos showcasing Plano's capabilities as an AI-native pr | [Preference-Based Routing](llm_routing/preference_based_routing/) | Routes prompts to LLMs based on user-defined preferences and task type (e.g. code generation vs. understanding) | | [Model Alias Routing](llm_routing/model_alias_routing/) | Maps semantic aliases (`arch.summarize.v1`) to provider-specific models for centralized governance | | [Claude Code Router](llm_routing/claude_code_router/) | Extends Claude Code with multi-provider access and preference-aligned routing for coding tasks | +| [Codex Router](llm_routing/codex_router/) | Extends Codex CLI with multi-provider access and preference-aligned routing for coding tasks | ## Agent Orchestration diff --git a/demos/llm_routing/codex_router/README.md b/demos/llm_routing/codex_router/README.md new file mode 100644 index 00000000..d3662581 --- /dev/null +++ b/demos/llm_routing/codex_router/README.md @@ -0,0 +1,92 @@ +# Codex Router - Multi-Model Access with Intelligent Routing + +Plano extends Codex CLI to access multiple LLM providers through a single interface. This gives you: + +1. **Access to Models**: Connect to OpenAI, Anthropic, xAI, Gemini, and local models via Ollama +2. **Intelligent Routing via Preferences for Coding Tasks**: Configure which models handle specific development tasks: + - Code generation and implementation + - Code understanding and analysis + - Debugging and optimization + - Architecture and system design + +Uses a [1.5B preference-aligned router LLM](https://arxiv.org/abs/2506.16655) to automatically select the best model based on your request type. + +## Benefits + +- **Single Interface**: Access multiple LLM providers through the same Codex CLI +- **Task-Aware Routing**: Requests are analyzed and routed to models based on task type (code generation vs code understanding) +- **Provider Flexibility**: Add or remove providers without changing your workflow +- **Routing Transparency**: See which model handles each request and why + +## Quick Start + +### Prerequisites + +```bash +# Install Codex CLI +npm install -g @openai/codex + +# Install Plano CLI +pip install planoai +``` + +### Step 1: Open the Demo + +```bash +git clone https://github.com/katanemo/arch.git +cd arch/demos/llm_routing/codex_router +``` + +### Step 2: Set API Keys + +```bash +export OPENAI_API_KEY="your-openai-key-here" +export ANTHROPIC_API_KEY="your-anthropic-key-here" +export XAI_API_KEY="your-xai-key-here" +export GEMINI_API_KEY="your-gemini-key-here" +``` + +### Step 3: Start Plano + +```bash +planoai up +# or: uvx planoai up +``` + +### Step 4: Launch Codex Through Plano + +```bash +planoai cli-agent codex +# or: uvx planoai cli-agent codex +``` + +By default, `planoai cli-agent codex` starts Codex with `gpt-5.3-codex`. With this demo config: + +- `code understanding` prompts are routed to `gpt-5-2025-08-07` +- `code generation` prompts are routed to `gpt-5.3-codex` + +## Monitor Routing Decisions + +In a second terminal: + +```bash +sh pretty_model_resolution.sh +``` + +This shows each request model and the final model selected by Plano's router. + +## Configuration Highlights + +`config.yaml` demonstrates: + +- OpenAI default model for Codex sessions (`gpt-5.3-codex`) +- Routing preference override for code understanding (`gpt-5-2025-08-07`) +- Additional providers (Anthropic, xAI, Gemini, Ollama local) to show cross-provider routing support + +## Optional Overrides + +Set a different Codex session model: + +```bash +planoai cli-agent codex --settings='{"CODEX_MODEL":"gpt-5-2025-08-07"}' +``` diff --git a/demos/llm_routing/codex_router/config.yaml b/demos/llm_routing/codex_router/config.yaml new file mode 100644 index 00000000..7cafe641 --- /dev/null +++ b/demos/llm_routing/codex_router/config.yaml @@ -0,0 +1,38 @@ +version: v0.3.0 + +listeners: + - type: model + name: model_listener + port: 12000 + +model_providers: + # OpenAI models used by Codex defaults and preference routing + - model: openai/gpt-5.3-codex + default: true + access_key: $OPENAI_API_KEY + routing_preferences: + - name: code generation + description: generating new code snippets, functions, or boilerplate based on user prompts or requirements + + - model: xai/grok-4-1-fast-non-reasoning + access_key: $GROK_API_KEY + routing_preferences: + - name: project understanding + description: understand repository structure, codebase, and code files, readmes, and other documentation + + # Additional providers (optional): Codex can route to any configured model + # - model: anthropic/claude-sonnet-4-5 + # access_key: $ANTHROPIC_API_KEY + + # - model: xai/grok-4-1-fast-non-reasoning + # access_key: $GROK_API_KEY + + - model: ollama/llama3.1 + base_url: http://localhost:11434 + +model_aliases: + arch.codex.default: + target: gpt-5.3-codex + +tracing: + random_sampling: 100 diff --git a/demos/llm_routing/codex_router/pretty_model_resolution.sh b/demos/llm_routing/codex_router/pretty_model_resolution.sh new file mode 100644 index 00000000..b6187e65 --- /dev/null +++ b/demos/llm_routing/codex_router/pretty_model_resolution.sh @@ -0,0 +1,33 @@ +#!/usr/bin/env bash +# Pretty-print Plano MODEL_RESOLUTION lines from docker logs +# - hides Arch-Router +# - prints timestamp +# - colors MODEL_RESOLUTION red +# - colors req_model cyan +# - colors resolved_model magenta +# - removes provider and streaming + +docker logs -f plano 2>&1 \ +| awk ' +/MODEL_RESOLUTION:/ && $0 !~ /Arch-Router/ { + # extract timestamp between first [ and ] + ts="" + if (match($0, /\[[0-9-]+ [0-9:.]+\]/)) { + ts=substr($0, RSTART+1, RLENGTH-2) + } + + # split out after MODEL_RESOLUTION: + n = split($0, parts, /MODEL_RESOLUTION: */) + line = parts[2] + + # remove provider and streaming fields + sub(/ *provider='\''[^'\'']+'\''/, "", line) + sub(/ *streaming=(true|false)/, "", line) + + # highlight fields + gsub(/req_model='\''[^'\'']+'\''/, "\033[36m&\033[0m", line) + gsub(/resolved_model='\''[^'\'']+'\''/, "\033[35m&\033[0m", line) + + # print timestamp + MODEL_RESOLUTION + printf "\033[90m[%s]\033[0m \033[31mMODEL_RESOLUTION\033[0m: %s\n", ts, line +}' diff --git a/demos/llm_routing/model_routing_service/README.md b/demos/llm_routing/model_routing_service/README.md new file mode 100644 index 00000000..85d56abf --- /dev/null +++ b/demos/llm_routing/model_routing_service/README.md @@ -0,0 +1,92 @@ +# Model Routing Service Demo + +This demo shows how to use the `/routing/v1/*` endpoints to get routing decisions without proxying requests to an LLM. The endpoint accepts standard LLM request formats and returns which model Plano's router would select. + +## Setup + +Make sure you have Plano CLI installed (`pip install planoai` or `uv tool install planoai`). + +```bash +export OPENAI_API_KEY= +export ANTHROPIC_API_KEY= +``` + +Start Plano: +```bash +cd demos/llm_routing/model_routing_service +planoai up config.yaml +``` + +## Run the demo + +```bash +./demo.sh +``` + +## Endpoints + +All three LLM API formats are supported: + +| Endpoint | Format | +|---|---| +| `POST /routing/v1/chat/completions` | OpenAI Chat Completions | +| `POST /routing/v1/messages` | Anthropic Messages | +| `POST /routing/v1/responses` | OpenAI Responses API | + +## Example + +```bash +curl http://localhost:12000/routing/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Write a Python function for binary search"}] + }' +``` + +Response: +```json +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "c16d1096c1af4a17abb48fb182918a88" +} +``` + +The response tells you which model would handle this request and which route was matched, without actually making the LLM call. + +## Demo Output + +``` +=== Model Routing Service Demo === + +--- 1. Code generation query (OpenAI format) --- +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "c16d1096c1af4a17abb48fb182918a88" +} + +--- 2. Complex reasoning query (OpenAI format) --- +{ + "model": "openai/gpt-4o", + "route": "complex_reasoning", + "trace_id": "30795e228aff4d7696f082ed01b75ad4" +} + +--- 3. Simple query - no routing match (OpenAI format) --- +{ + "model": "none", + "route": null, + "trace_id": "ae0b6c3b220d499fb5298ac63f4eac0e" +} + +--- 4. Code generation query (Anthropic format) --- +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "26be822bbdf14a3ba19fe198e55ea4a9" +} + +=== Demo Complete === +``` diff --git a/demos/llm_routing/model_routing_service/config.yaml b/demos/llm_routing/model_routing_service/config.yaml new file mode 100644 index 00000000..7b98b25b --- /dev/null +++ b/demos/llm_routing/model_routing_service/config.yaml @@ -0,0 +1,27 @@ +version: v0.3.0 + +listeners: + - type: model + name: model_listener + port: 12000 + +model_providers: + + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + + - model: openai/gpt-4o + access_key: $OPENAI_API_KEY + routing_preferences: + - name: complex_reasoning + description: complex reasoning tasks, multi-step analysis, or detailed explanations + + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + routing_preferences: + - name: code_generation + description: generating new code, writing functions, or creating boilerplate + +tracing: + random_sampling: 100 diff --git a/demos/llm_routing/model_routing_service/demo.sh b/demos/llm_routing/model_routing_service/demo.sh new file mode 100755 index 00000000..0c3fdc5d --- /dev/null +++ b/demos/llm_routing/model_routing_service/demo.sh @@ -0,0 +1,120 @@ +#!/bin/bash +set -e + +PLANO_URL="${PLANO_URL:-http://localhost:12000}" + +echo "=== Model Routing Service Demo ===" +echo "" +echo "This demo shows how to use the /routing/v1/* endpoints to get" +echo "routing decisions without actually proxying the request to an LLM." +echo "" + +# --- Example 1: OpenAI Chat Completions format --- +echo "--- 1. Code generation query (OpenAI format) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Write a Python function that implements binary search on a sorted array"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 2: Complex reasoning query --- +echo "--- 2. Complex reasoning query (OpenAI format) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Explain the trade-offs between microservices and monolithic architectures, considering scalability, team structure, and operational complexity"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 3: Simple query (no routing match) --- +echo "--- 3. Simple query - no routing match (OpenAI format) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "What is the capital of France?"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 4: Anthropic Messages format --- +echo "--- 4. Code generation query (Anthropic format) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/messages" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "max_tokens": 1024, + "messages": [ + {"role": "user", "content": "Create a REST API endpoint in Rust using actix-web that handles user registration"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 5: Inline routing policy in request body --- +echo "--- 5. Inline routing_policy (no config needed) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Write a quicksort implementation in Go"} + ], + "routing_policy": [ + { + "model": "openai/gpt-4o", + "routing_preferences": [ + {"name": "coding", "description": "code generation, writing functions, debugging"} + ] + }, + { + "model": "openai/gpt-4o-mini", + "routing_preferences": [ + {"name": "general", "description": "general questions, simple lookups, casual conversation"} + ] + } + ] + }' | python3 -m json.tool +echo "" + +# --- Example 6: Inline routing policy with Anthropic format --- +echo "--- 6. Inline routing_policy (Anthropic format) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/messages" \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o-mini", + "max_tokens": 1024, + "messages": [ + {"role": "user", "content": "What is the weather like today?"} + ], + "routing_policy": [ + { + "model": "openai/gpt-4o", + "routing_preferences": [ + {"name": "coding", "description": "code generation, writing functions, debugging"} + ] + }, + { + "model": "openai/gpt-4o-mini", + "routing_preferences": [ + {"name": "general", "description": "general questions, simple lookups, casual conversation"} + ] + } + ] + }' | python3 -m json.tool +echo "" + +echo "=== Demo Complete ===" diff --git a/docs/source/resources/deployment.rst b/docs/source/resources/deployment.rst index 71452ea3..7b8b0554 100644 --- a/docs/source/resources/deployment.rst +++ b/docs/source/resources/deployment.rst @@ -100,6 +100,194 @@ You can also use the CLI with Docker mode: planoai up plano_config.yaml --docker planoai down --docker +Kubernetes Deployment +--------------------- + +Plano runs as a single container in Kubernetes. The container bundles Envoy, WASM plugins, and brightstaff, managed by supervisord internally. Deploy it as a standard Kubernetes Deployment with your ``plano_config.yaml`` mounted via a ConfigMap and API keys injected via a Secret. + +.. note:: + All environment variables referenced in your ``plano_config.yaml`` (e.g. ``$OPENAI_API_KEY``) must be set in the container environment. Use Kubernetes Secrets for API keys. + +Step 1: Create the Config +~~~~~~~~~~~~~~~~~~~~~~~~~ + +Store your ``plano_config.yaml`` in a ConfigMap: + +.. code-block:: bash + + kubectl create configmap plano-config --from-file=plano_config.yaml=./plano_config.yaml + +Step 2: Create API Key Secrets +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +Store your LLM provider API keys in a Secret: + +.. code-block:: bash + + kubectl create secret generic plano-secrets \ + --from-literal=OPENAI_API_KEY=sk-... \ + --from-literal=ANTHROPIC_API_KEY=sk-ant-... + +Step 3: Deploy Plano +~~~~~~~~~~~~~~~~~~~~ + +Create a ``plano-deployment.yaml``: + +.. code-block:: yaml + + apiVersion: apps/v1 + kind: Deployment + metadata: + name: plano + labels: + app: plano + spec: + replicas: 1 + selector: + matchLabels: + app: plano + template: + metadata: + labels: + app: plano + spec: + containers: + - name: plano + image: katanemo/plano:0.4.11 + ports: + - containerPort: 12000 # LLM gateway (chat completions, model routing) + name: llm-gateway + envFrom: + - secretRef: + name: plano-secrets + env: + - name: LOG_LEVEL + value: "info" + volumeMounts: + - name: plano-config + mountPath: /app/plano_config.yaml + subPath: plano_config.yaml + readOnly: true + readinessProbe: + httpGet: + path: /healthz + port: 12000 + initialDelaySeconds: 5 + periodSeconds: 10 + livenessProbe: + httpGet: + path: /healthz + port: 12000 + initialDelaySeconds: 10 + periodSeconds: 30 + resources: + requests: + memory: "256Mi" + cpu: "250m" + limits: + memory: "512Mi" + cpu: "1000m" + volumes: + - name: plano-config + configMap: + name: plano-config + --- + apiVersion: v1 + kind: Service + metadata: + name: plano + spec: + selector: + app: plano + ports: + - name: llm-gateway + port: 12000 + targetPort: 12000 + +Apply it: + +.. code-block:: bash + + kubectl apply -f plano-deployment.yaml + +Step 4: Verify +~~~~~~~~~~~~~~ + +.. code-block:: bash + + # Check pod status + kubectl get pods -l app=plano + + # Check logs + kubectl logs -l app=plano -f + + # Test routing (port-forward for local testing) + kubectl port-forward svc/plano 12000:12000 + + curl -s -H "Content-Type: application/json" \ + -d '{"messages":[{"role":"user","content":"tell me a joke"}], "model":"none"}' \ + http://localhost:12000/v1/chat/completions | jq .model + +Updating Configuration +~~~~~~~~~~~~~~~~~~~~~~ + +To update ``plano_config.yaml``, replace the ConfigMap and restart the pod: + +.. code-block:: bash + + kubectl create configmap plano-config \ + --from-file=plano_config.yaml=./plano_config.yaml \ + --dry-run=client -o yaml | kubectl apply -f - + + kubectl rollout restart deployment/plano + +Enabling OTEL Tracing +~~~~~~~~~~~~~~~~~~~~~ + +Plano emits OpenTelemetry traces for every request — including routing decisions, model selection, and upstream latency. To export traces to an OTEL collector in your cluster, add the ``tracing`` section to your ``plano_config.yaml``: + +.. code-block:: yaml + + tracing: + opentracing_grpc_endpoint: "http://otel-collector.monitoring:4317" + random_sampling: 100 # percentage of requests to trace (1-100) + trace_arch_internal: true # include internal Plano spans + span_attributes: + header_prefixes: # capture request headers as span attributes + - "x-" + static: # add static attributes to all spans + environment: "production" + service: "plano" + +Set the ``OTEL_TRACING_GRPC_ENDPOINT`` environment variable or configure it directly in the config. Plano propagates the ``traceparent`` header end-to-end, so traces correlate across your upstream and downstream services. + +Environment Variables Reference +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The following environment variables can be set on the container: + +.. list-table:: + :header-rows: 1 + :widths: 30 50 20 + + * - Variable + - Description + - Default + * - ``LOG_LEVEL`` + - Log verbosity (``debug``, ``info``, ``warn``, ``error``) + - ``info`` + * - ``OPENAI_API_KEY`` + - OpenAI API key (if referenced in config) + - + * - ``ANTHROPIC_API_KEY`` + - Anthropic API key (if referenced in config) + - + * - ``OTEL_TRACING_GRPC_ENDPOINT`` + - OTEL collector endpoint for trace export + - ``http://localhost:4317`` + +Any environment variable referenced in ``plano_config.yaml`` with ``$VAR_NAME`` syntax will be substituted at startup. Use Kubernetes Secrets for sensitive values and ConfigMaps or ``env`` entries for non-sensitive configuration. + Runtime Tests -------------