From 8dedf0bec1694c06ee582fc2fcea558468e7486d Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 8 Apr 2026 17:32:02 -0700 Subject: [PATCH] Model affinity for consistent model selection in agentic loops (#827) --- config/plano_config_schema.yaml | 17 ++ crates/brightstaff/src/handlers/llm/mod.rs | 114 +++++++---- .../src/handlers/routing_service.rs | 63 +++++- crates/brightstaff/src/main.rs | 17 ++ crates/brightstaff/src/router/llm.rs | 182 +++++++++++++++++- crates/common/src/configuration.rs | 9 + crates/common/src/consts.rs | 1 + .../model_routing_service/README.md | 122 ++++++++++++ .../llm_routing/model_routing_service/demo.sh | 43 +++++ docs/routing-api.md | 43 +++++ docs/source/guides/llm_router.rst | 38 ++++ .../includes/plano_config_full_reference.yaml | 5 + .../plano_config_full_reference_rendered.yaml | 3 + 13 files changed, 614 insertions(+), 43 deletions(-) diff --git a/config/plano_config_schema.yaml b/config/plano_config_schema.yaml index 0bdca9e4..d681b089 100644 --- a/config/plano_config_schema.yaml +++ b/config/plano_config_schema.yaml @@ -425,6 +425,23 @@ properties: enum: - llm - prompt + routing: + type: object + properties: + llm_provider: + type: string + model: + type: string + session_ttl_seconds: + type: integer + minimum: 1 + description: TTL in seconds for session-pinned routing cache entries. Default 600 (10 minutes). + session_max_entries: + type: integer + minimum: 1 + maximum: 10000 + description: Maximum number of session-pinned routing cache entries. Default 10000. + additionalProperties: false state_storage: type: object properties: diff --git a/crates/brightstaff/src/handlers/llm/mod.rs b/crates/brightstaff/src/handlers/llm/mod.rs index 1570a2d8..80455cfb 100644 --- a/crates/brightstaff/src/handlers/llm/mod.rs +++ b/crates/brightstaff/src/handlers/llm/mod.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use common::configuration::{FilterPipeline, ModelAlias}; -use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; +use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, MODEL_AFFINITY_HEADER}; use common::llm_providers::LlmProviders; use hermesllm::apis::openai::Message; use hermesllm::apis::openai_responses::InputParam; @@ -94,6 +94,21 @@ async fn llm_chat_inner( let traceparent = extract_or_generate_traceparent(&request_headers); + // Session pinning: extract session ID and check cache before routing + let session_id: Option = request_headers + .get(MODEL_AFFINITY_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()); + let pinned_model: Option = if let Some(ref sid) = session_id { + state + .router_service + .get_cached_route(sid) + .await + .map(|c| c.model_name) + } else { + None + }; + let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path); // --- Phase 1: Parse and validate the incoming request --- @@ -244,46 +259,65 @@ async fn llm_chat_inner( } }; - // --- Phase 3: Route the request --- - let routing_span = info_span!( - "routing", - component = "routing", - http.method = "POST", - http.target = %request_path, - model.requested = %model_from_request, - model.alias_resolved = %alias_resolved_model, - route.selected_model = tracing::field::Empty, - routing.determination_ms = tracing::field::Empty, - ); - let routing_result = match async { - set_service_name(operation_component::ROUTING); - router_chat_get_upstream_model( - Arc::clone(&state.router_service), - client_request, - &traceparent, - &request_path, - &request_id, - inline_routing_preferences, - ) - .await - } - .instrument(routing_span) - .await - { - Ok(result) => result, - Err(err) => { - let mut internal_error = Response::new(full(err.message)); - *internal_error.status_mut() = err.status_code; - return Ok(internal_error); - } - }; - - // Determine final model (router returns "none" when it doesn't select a specific model) - let router_selected_model = routing_result.model_name; - let resolved_model = if router_selected_model != "none" { - router_selected_model + // --- Phase 3: Route the request (or use pinned model from session cache) --- + let resolved_model = if let Some(cached_model) = pinned_model { + info!( + session_id = %session_id.as_deref().unwrap_or(""), + model = %cached_model, + "using pinned routing decision from cache" + ); + cached_model } else { - alias_resolved_model.clone() + let routing_span = info_span!( + "routing", + component = "routing", + http.method = "POST", + http.target = %request_path, + model.requested = %model_from_request, + model.alias_resolved = %alias_resolved_model, + route.selected_model = tracing::field::Empty, + routing.determination_ms = tracing::field::Empty, + ); + let routing_result = match async { + set_service_name(operation_component::ROUTING); + router_chat_get_upstream_model( + Arc::clone(&state.router_service), + client_request, + &traceparent, + &request_path, + &request_id, + inline_routing_preferences, + ) + .await + } + .instrument(routing_span) + .await + { + Ok(result) => result, + Err(err) => { + let mut internal_error = Response::new(full(err.message)); + *internal_error.status_mut() = err.status_code; + return Ok(internal_error); + } + }; + + let (router_selected_model, route_name) = + (routing_result.model_name, routing_result.route_name); + let model = if router_selected_model != "none" { + router_selected_model + } else { + alias_resolved_model.clone() + }; + + // Cache the routing decision so subsequent requests with the same session ID are pinned + if let Some(ref sid) = session_id { + state + .router_service + .cache_route(sid.clone(), model.clone(), route_name) + .await; + } + + model }; tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str()); diff --git a/crates/brightstaff/src/handlers/routing_service.rs b/crates/brightstaff/src/handlers/routing_service.rs index 6566a324..d09afe21 100644 --- a/crates/brightstaff/src/handlers/routing_service.rs +++ b/crates/brightstaff/src/handlers/routing_service.rs @@ -1,6 +1,6 @@ use bytes::Bytes; use common::configuration::{SpanAttributes, TopLevelRoutingPreference}; -use common::consts::REQUEST_ID_HEADER; +use common::consts::{MODEL_AFFINITY_HEADER, REQUEST_ID_HEADER}; use common::errors::BrightStaffError; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::ProviderRequestType; @@ -53,6 +53,9 @@ struct RoutingDecisionResponse { models: Vec, route: Option, trace_id: String, + #[serde(skip_serializing_if = "Option::is_none")] + session_id: Option, + pinned: bool, } pub async fn routing_decision( @@ -68,6 +71,11 @@ pub async fn routing_decision( .map(|s| s.to_string()) .unwrap_or_else(|| uuid::Uuid::new_v4().to_string()); + let session_id: Option = request_headers + .get(MODEL_AFFINITY_HEADER) + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()); + let custom_attrs = collect_custom_trace_attributes(&request_headers, span_attributes.as_ref()); let request_span = info_span!( @@ -85,6 +93,7 @@ pub async fn routing_decision( request_path, request_headers, custom_attrs, + session_id, ) .instrument(request_span) .await @@ -97,6 +106,7 @@ async fn routing_decision_inner( request_path: String, request_headers: hyper::HeaderMap, custom_attrs: std::collections::HashMap, + session_id: Option, ) -> Result>, hyper::Error> { set_service_name(operation_component::ROUTING); opentelemetry::trace::get_active_span(|span| { @@ -114,6 +124,34 @@ async fn routing_decision_inner( .unwrap_or("unknown") .to_string(); + // Session pinning: check cache before doing any routing work + if let Some(ref sid) = session_id { + if let Some(cached) = router_service.get_cached_route(sid).await { + info!( + session_id = %sid, + model = %cached.model_name, + route = ?cached.route_name, + "returning pinned routing decision from cache" + ); + let response = RoutingDecisionResponse { + models: vec![cached.model_name], + route: cached.route_name, + trace_id, + session_id: Some(sid.clone()), + pinned: true, + }; + let json = serde_json::to_string(&response).unwrap(); + let body = Full::new(Bytes::from(json)) + .map_err(|never| match never {}) + .boxed(); + return Ok(Response::builder() + .status(StatusCode::OK) + .header("Content-Type", "application/json") + .body(body) + .unwrap()); + } + } + // Parse request body let raw_bytes = request.collect().await?.to_bytes(); @@ -152,7 +190,7 @@ async fn routing_decision_inner( }; let routing_result = router_chat_get_upstream_model( - router_service, + Arc::clone(&router_service), client_request, &traceparent, &request_path, @@ -163,10 +201,23 @@ async fn routing_decision_inner( match routing_result { Ok(result) => { + // Cache the result if session_id is present + if let Some(ref sid) = session_id { + router_service + .cache_route( + sid.clone(), + result.model_name.clone(), + result.route_name.clone(), + ) + .await; + } + let response = RoutingDecisionResponse { models: result.models, route: result.route_name, trace_id, + session_id, + pinned: false, }; info!( @@ -329,6 +380,8 @@ mod tests { ], route: Some("code_generation".to_string()), trace_id: "abc123".to_string(), + session_id: Some("sess-abc".to_string()), + pinned: true, }; let json = serde_json::to_string(&response).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); @@ -336,6 +389,8 @@ mod tests { assert_eq!(parsed["models"][1], "openai/gpt-4o"); assert_eq!(parsed["route"], "code_generation"); assert_eq!(parsed["trace_id"], "abc123"); + assert_eq!(parsed["session_id"], "sess-abc"); + assert_eq!(parsed["pinned"], true); } #[test] @@ -344,10 +399,14 @@ mod tests { models: vec!["none".to_string()], route: None, trace_id: "abc123".to_string(), + session_id: None, + pinned: false, }; let json = serde_json::to_string(&response).unwrap(); let parsed: serde_json::Value = serde_json::from_str(&json).unwrap(); assert_eq!(parsed["models"][0], "none"); assert!(parsed["route"].is_null()); + assert!(parsed.get("session_id").is_none()); + assert_eq!(parsed["pinned"], false); } } diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index bc88b60b..f179dc4b 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -174,6 +174,9 @@ async fn init_app_state( .map(|p| p.name.clone()) .unwrap_or_else(|| DEFAULT_ROUTING_LLM_PROVIDER.to_string()); + let session_ttl_seconds = config.routing.as_ref().and_then(|r| r.session_ttl_seconds); + let session_max_entries = config.routing.as_ref().and_then(|r| r.session_max_entries); + // Validate that top-level routing_preferences requires v0.4.0+. let config_version = parse_semver(&config.version); let is_v040_plus = config_version >= (0, 4, 0); @@ -300,8 +303,22 @@ async fn init_app_state( format!("{llm_provider_url}{CHAT_COMPLETIONS_PATH}"), routing_model_name, routing_llm_provider, + session_ttl_seconds, + session_max_entries, )); + // Spawn background task to clean up expired session cache entries every 5 minutes + { + let router_service = Arc::clone(&router_service); + tokio::spawn(async move { + let mut interval = tokio::time::interval(std::time::Duration::from_secs(300)); + loop { + interval.tick().await; + router_service.cleanup_expired_sessions().await; + } + }); + } + let orchestrator_model_name: String = overrides .agent_orchestration_model .as_deref() diff --git a/crates/brightstaff/src/router/llm.rs b/crates/brightstaff/src/router/llm.rs index 305c548a..5a208c6e 100644 --- a/crates/brightstaff/src/router/llm.rs +++ b/crates/brightstaff/src/router/llm.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::{collections::HashMap, sync::Arc, time::Duration, time::Instant}; use common::{ configuration::TopLevelRoutingPreference, @@ -9,6 +9,7 @@ use super::router_model::{ModelUsagePreference, RoutingPreference}; use hermesllm::apis::openai::Message; use hyper::header; use thiserror::Error; +use tokio::sync::RwLock; use tracing::{debug, info}; use super::http::{self, post_and_extract_content}; @@ -17,6 +18,17 @@ use super::router_model::RouterModel; use crate::router::router_model_v1; +const DEFAULT_SESSION_TTL_SECONDS: u64 = 600; +const DEFAULT_SESSION_MAX_ENTRIES: usize = 10_000; +const MAX_SESSION_MAX_ENTRIES: usize = 10_000; + +#[derive(Clone, Debug)] +pub struct CachedRoute { + pub model_name: String, + pub route_name: Option, + pub cached_at: Instant, +} + pub struct RouterService { router_url: String, client: reqwest::Client, @@ -24,6 +36,9 @@ pub struct RouterService { routing_provider_name: String, top_level_preferences: HashMap, metrics_service: Option>, + session_cache: RwLock>, + session_ttl: Duration, + session_max_entries: usize, } #[derive(Debug, Error)] @@ -44,6 +59,8 @@ impl RouterService { router_url: String, routing_model_name: String, routing_provider_name: String, + session_ttl_seconds: Option, + session_max_entries: Option, ) -> Self { let top_level_preferences: HashMap = top_level_prefs .map_or_else(HashMap::new, |prefs| { @@ -74,6 +91,12 @@ impl RouterService { router_model_v1::MAX_TOKEN_LEN, )); + let session_ttl = + Duration::from_secs(session_ttl_seconds.unwrap_or(DEFAULT_SESSION_TTL_SECONDS)); + let session_max_entries = session_max_entries + .unwrap_or(DEFAULT_SESSION_MAX_ENTRIES) + .min(MAX_SESSION_MAX_ENTRIES); + RouterService { router_url, client: reqwest::Client::new(), @@ -81,6 +104,64 @@ impl RouterService { routing_provider_name, top_level_preferences, metrics_service, + session_cache: RwLock::new(HashMap::new()), + session_ttl, + session_max_entries, + } + } + + /// Look up a cached routing decision by session ID. + /// Returns None if not found or expired. + pub async fn get_cached_route(&self, session_id: &str) -> Option { + let cache = self.session_cache.read().await; + if let Some(entry) = cache.get(session_id) { + if entry.cached_at.elapsed() < self.session_ttl { + return Some(entry.clone()); + } + } + None + } + + /// Store a routing decision in the session cache. + /// If at max capacity, evicts the oldest entry. + pub async fn cache_route( + &self, + session_id: String, + model_name: String, + route_name: Option, + ) { + let mut cache = self.session_cache.write().await; + if cache.len() >= self.session_max_entries && !cache.contains_key(&session_id) { + if let Some(oldest_key) = cache + .iter() + .min_by_key(|(_, v)| v.cached_at) + .map(|(k, _)| k.clone()) + { + cache.remove(&oldest_key); + } + } + cache.insert( + session_id, + CachedRoute { + model_name, + route_name, + cached_at: Instant::now(), + }, + ); + } + + /// Remove all expired entries from the session cache. + pub async fn cleanup_expired_sessions(&self) { + let mut cache = self.session_cache.write().await; + let before = cache.len(); + cache.retain(|_, entry| entry.cached_at.elapsed() < self.session_ttl); + let removed = before - cache.len(); + if removed > 0 { + info!( + removed = removed, + remaining = cache.len(), + "cleaned up expired session cache entries" + ); } } @@ -198,3 +279,102 @@ impl RouterService { Ok(result) } } + +#[cfg(test)] +mod tests { + use super::*; + + fn make_router_service(ttl_seconds: u64, max_entries: usize) -> RouterService { + RouterService::new( + None, + None, + "http://localhost:12001/v1/chat/completions".to_string(), + "Arch-Router".to_string(), + "arch-router".to_string(), + Some(ttl_seconds), + Some(max_entries), + ) + } + + #[tokio::test] + async fn test_cache_miss_returns_none() { + let svc = make_router_service(600, 100); + assert!(svc.get_cached_route("unknown-session").await.is_none()); + } + + #[tokio::test] + async fn test_cache_hit_returns_cached_route() { + let svc = make_router_service(600, 100); + svc.cache_route( + "s1".to_string(), + "gpt-4o".to_string(), + Some("code".to_string()), + ) + .await; + + let cached = svc.get_cached_route("s1").await.unwrap(); + assert_eq!(cached.model_name, "gpt-4o"); + assert_eq!(cached.route_name, Some("code".to_string())); + } + + #[tokio::test] + async fn test_cache_expired_entry_returns_none() { + let svc = make_router_service(0, 100); + svc.cache_route("s1".to_string(), "gpt-4o".to_string(), None) + .await; + assert!(svc.get_cached_route("s1").await.is_none()); + } + + #[tokio::test] + async fn test_cleanup_removes_expired() { + let svc = make_router_service(0, 100); + svc.cache_route("s1".to_string(), "gpt-4o".to_string(), None) + .await; + svc.cache_route("s2".to_string(), "claude".to_string(), None) + .await; + + svc.cleanup_expired_sessions().await; + + let cache = svc.session_cache.read().await; + assert!(cache.is_empty()); + } + + #[tokio::test] + async fn test_cache_evicts_oldest_when_full() { + let svc = make_router_service(600, 2); + svc.cache_route("s1".to_string(), "model-a".to_string(), None) + .await; + tokio::time::sleep(Duration::from_millis(10)).await; + svc.cache_route("s2".to_string(), "model-b".to_string(), None) + .await; + + svc.cache_route("s3".to_string(), "model-c".to_string(), None) + .await; + + let cache = svc.session_cache.read().await; + assert_eq!(cache.len(), 2); + assert!(!cache.contains_key("s1")); + assert!(cache.contains_key("s2")); + assert!(cache.contains_key("s3")); + } + + #[tokio::test] + async fn test_cache_update_existing_session_does_not_evict() { + let svc = make_router_service(600, 2); + svc.cache_route("s1".to_string(), "model-a".to_string(), None) + .await; + svc.cache_route("s2".to_string(), "model-b".to_string(), None) + .await; + + svc.cache_route( + "s1".to_string(), + "model-a-updated".to_string(), + Some("route".to_string()), + ) + .await; + + let cache = svc.session_cache.read().await; + assert_eq!(cache.len(), 2); + assert_eq!(cache.get("s1").unwrap().model_name, "model-a-updated"); + } +} diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 7aa23c67..c4c5924a 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -7,6 +7,14 @@ use crate::api::open_ai::{ ChatCompletionTool, FunctionDefinition, FunctionParameter, FunctionParameters, ParameterType, }; +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Routing { + pub llm_provider: Option, + pub model: Option, + pub session_ttl_seconds: Option, + pub session_max_entries: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ModelAlias { pub target: String, @@ -182,6 +190,7 @@ pub struct Configuration { pub model_providers: Vec, pub model_aliases: Option>, pub overrides: Option, + pub routing: Option, pub system_prompt: Option, pub prompt_guards: Option, pub prompt_targets: Option>, diff --git a/crates/common/src/consts.rs b/crates/common/src/consts.rs index dbd0bc41..c99639ad 100644 --- a/crates/common/src/consts.rs +++ b/crates/common/src/consts.rs @@ -22,6 +22,7 @@ pub const X_ARCH_TOOL_CALL: &str = "x-arch-tool-call-message"; pub const X_ARCH_FC_MODEL_RESPONSE: &str = "x-arch-fc-model-response"; pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function"; pub const REQUEST_ID_HEADER: &str = "x-request-id"; +pub const MODEL_AFFINITY_HEADER: &str = "x-model-affinity"; pub const ENVOY_ORIGINAL_PATH_HEADER: &str = "x-envoy-original-path"; pub const TRACE_PARENT_HEADER: &str = "traceparent"; pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal"; diff --git a/demos/llm_routing/model_routing_service/README.md b/demos/llm_routing/model_routing_service/README.md index e8c8a995..4687b47c 100644 --- a/demos/llm_routing/model_routing_service/README.md +++ b/demos/llm_routing/model_routing_service/README.md @@ -106,6 +106,63 @@ Response: The response contains the model list — your client should try `models[0]` first and fall back to `models[1]` on 429 or 5xx errors. +## Session Pinning + +Send an `X-Model-Affinity` header to pin the routing decision for a session. Once a model is selected, all subsequent requests with the same session ID return the same model without re-running routing. + +```bash +# First call — runs routing, caches result +curl http://localhost:12000/routing/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "X-Model-Affinity: my-session-123" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Write a Python function for binary search"}] + }' +``` + +Response (first call): +```json +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "c16d1096c1af4a17abb48fb182918a88", + "session_id": "my-session-123", + "pinned": false +} +``` + +```bash +# Second call — same session, returns cached result +curl http://localhost:12000/routing/v1/chat/completions \ + -H "Content-Type: application/json" \ + -H "X-Model-Affinity: my-session-123" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [{"role": "user", "content": "Now explain merge sort"}] + }' +``` + +Response (pinned): +```json +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "a1b2c3d4e5f6...", + "session_id": "my-session-123", + "pinned": true +} +``` + +Session TTL and max cache size are configurable in `config.yaml`: +```yaml +routing: + session_ttl_seconds: 600 # default: 600 (10 minutes) + session_max_entries: 10000 # default: 10000 +``` + +Without the `X-Model-Affinity` header, routing runs fresh every time (no breaking change). + ## Kubernetes Deployment (Self-hosted Arch-Router on GPU) To run Arch-Router in-cluster using vLLM instead of the default hosted endpoint: @@ -167,3 +224,68 @@ kubectl create configmap plano-config \ --dry-run=client -o yaml | kubectl apply -f - kubectl rollout restart deployment/plano ``` + + +## Demo Output + +``` +=== Model Routing Service Demo === + +--- 1. Code generation query (OpenAI format) --- +{ + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o"], + "route": "code_generation", + "trace_id": "c16d1096c1af4a17abb48fb182918a88" +} + +--- 2. Complex reasoning query (OpenAI format) --- +{ + "models": ["openai/gpt-4o", "openai/gpt-4o-mini"], + "route": "complex_reasoning", + "trace_id": "30795e228aff4d7696f082ed01b75ad4" +} + +--- 3. Simple query - no routing match (OpenAI format) --- +{ + "models": ["none"], + "route": null, + "trace_id": "ae0b6c3b220d499fb5298ac63f4eac0e" +} + +--- 4. Code generation query (Anthropic format) --- +{ + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o"], + "route": "code_generation", + "trace_id": "26be822bbdf14a3ba19fe198e55ea4a9" +} + +--- 7. Session pinning - first call (fresh routing decision) --- +{ + "models": ["anthropic/claude-sonnet-4-20250514", "openai/gpt-4o"], + "route": "code_generation", + "trace_id": "f1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6", + "session_id": "demo-session-001", + "pinned": false +} + +--- 8. Session pinning - second call (same session, pinned) --- + Notice: same model returned with "pinned": true, routing was skipped +{ + "model": "anthropic/claude-sonnet-4-20250514", + "route": "code_generation", + "trace_id": "a9b8c7d6e5f4a3b2c1d0e9f8a7b6c5d4", + "session_id": "demo-session-001", + "pinned": true +} + +--- 9. Different session gets its own fresh routing --- +{ + "models": ["openai/gpt-4o", "openai/gpt-4o-mini"], + "route": "complex_reasoning", + "trace_id": "1a2b3c4d5e6f7a8b9c0d1e2f3a4b5c6d", + "session_id": "demo-session-002", + "pinned": false +} + +=== Demo Complete === +``` diff --git a/demos/llm_routing/model_routing_service/demo.sh b/demos/llm_routing/model_routing_service/demo.sh index 3ad102f1..dafd60b3 100755 --- a/demos/llm_routing/model_routing_service/demo.sh +++ b/demos/llm_routing/model_routing_service/demo.sh @@ -109,4 +109,47 @@ curl -s "$PLANO_URL/routing/v1/chat/completions" \ }' | python3 -m json.tool echo "" +# --- Example 7: Session pinning - first call (fresh routing) --- +echo "--- 7. Session pinning - first call (fresh routing decision) ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -H "X-Model-Affinity: demo-session-001" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Write a Python function that implements binary search on a sorted array"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 8: Session pinning - second call (pinned result) --- +echo "--- 8. Session pinning - second call (same session, pinned) ---" +echo " Notice: same model returned with \"pinned\": true, routing was skipped" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -H "X-Model-Affinity: demo-session-001" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Now explain how merge sort works and when to prefer it over quicksort"} + ] + }' | python3 -m json.tool +echo "" + +# --- Example 9: Different session gets fresh routing --- +echo "--- 9. Different session gets its own fresh routing ---" +echo "" +curl -s "$PLANO_URL/routing/v1/chat/completions" \ + -H "Content-Type: application/json" \ + -H "X-Model-Affinity: demo-session-002" \ + -d '{ + "model": "gpt-4o-mini", + "messages": [ + {"role": "user", "content": "Explain the trade-offs between microservices and monolithic architectures"} + ] + }' | python3 -m json.tool +echo "" + echo "=== Demo Complete ===" diff --git a/docs/routing-api.md b/docs/routing-api.md index 0b30d627..c2b9c63f 100644 --- a/docs/routing-api.md +++ b/docs/routing-api.md @@ -120,6 +120,49 @@ routing_preferences: --- +## Model Affinity + +In agentic loops where the same session makes multiple LLM calls, send an `X-Model-Affinity` header to pin the routing decision. The first request routes normally and caches the result. All subsequent requests with the same affinity ID return the cached model without re-running routing. + +```json +POST /v1/chat/completions +X-Model-Affinity: a1b2c3d4-5678-... + +{ + "model": "openai/gpt-4o-mini", + "messages": [...] +} +``` + +The routing decision endpoint also supports model affinity: + +```json +POST /routing/v1/chat/completions +X-Model-Affinity: a1b2c3d4-5678-... +``` + +Response when pinned: +```json +{ + "models": ["anthropic/claude-sonnet-4-20250514"], + "route": "code generation", + "trace_id": "...", + "session_id": "a1b2c3d4-5678-...", + "pinned": true +} +``` + +Without the header, routing runs fresh every time (no breaking change). + +Configure TTL and cache size: +```yaml +routing: + session_ttl_seconds: 600 # default: 10 min + session_max_entries: 10000 # upper limit +``` + +--- + ## Version Requirements | Version | Top-level `routing_preferences` | diff --git a/docs/source/guides/llm_router.rst b/docs/source/guides/llm_router.rst index 7c4ad685..f294043a 100644 --- a/docs/source/guides/llm_router.rst +++ b/docs/source/guides/llm_router.rst @@ -376,6 +376,44 @@ For the canonical Plano Kubernetes deployment (ConfigMap, Secrets, Deployment YA `demo README `_. +.. _model_affinity: + +Model Affinity +-------------- + +In agentic loops — where a single user request triggers multiple LLM calls through tool use — Plano's router classifies each turn independently. Because successive prompts differ in intent (tool selection looks like code generation, reasoning about results looks like analysis), the router may select different models mid-session. This causes behavioral inconsistency and invalidates provider-side KV caches, increasing both latency and cost. + +**Model affinity** pins the routing decision for the duration of a session. Send an ``X-Model-Affinity`` header with any string identifier (typically a UUID). The first request routes normally and caches the result. All subsequent requests with the same affinity ID skip routing and reuse the cached model. + +.. code-block:: python + + import uuid + from openai import OpenAI + + client = OpenAI(base_url="http://localhost:12000/v1", api_key="EMPTY") + affinity_id = str(uuid.uuid4()) + + # Every call in the loop uses the same header + response = client.chat.completions.create( + model="gpt-4o-mini", + messages=messages, + tools=tools, + extra_headers={"X-Model-Affinity": affinity_id}, + ) + +Without the header, routing runs fresh on every request — no behavior change for existing clients. + +**Configuration:** + +.. code-block:: yaml + + routing: + session_ttl_seconds: 600 # How long affinity lasts (default: 10 min) + session_max_entries: 10000 # Max cached sessions (upper limit: 10000) + +To start a new routing decision (e.g., when the agent's task changes), generate a new affinity ID. + + Combining Routing Methods ------------------------- diff --git a/docs/source/resources/includes/plano_config_full_reference.yaml b/docs/source/resources/includes/plano_config_full_reference.yaml index 452bc17a..787b09d3 100644 --- a/docs/source/resources/includes/plano_config_full_reference.yaml +++ b/docs/source/resources/includes/plano_config_full_reference.yaml @@ -174,6 +174,11 @@ overrides: # Model used for agent orchestration (must be listed in model_providers) agent_orchestration_model: Plano-Orchestrator +# Model affinity — pin routing decisions for agentic loops +routing: + session_ttl_seconds: 600 # How long a pinned session lasts (default: 600s / 10 min) + session_max_entries: 10000 # Max cached sessions before eviction (upper limit: 10000) + # State storage for multi-turn conversation history state_storage: type: memory # "memory" (in-process) or "postgres" (persistent) diff --git a/docs/source/resources/includes/plano_config_full_reference_rendered.yaml b/docs/source/resources/includes/plano_config_full_reference_rendered.yaml index c4a17762..c4511b9d 100644 --- a/docs/source/resources/includes/plano_config_full_reference_rendered.yaml +++ b/docs/source/resources/includes/plano_config_full_reference_rendered.yaml @@ -215,6 +215,9 @@ ratelimits: selector: key: x-org-id value: acme-corp +routing: + session_max_entries: 10000 + session_ttl_seconds: 600 state_storage: type: memory system_prompt: 'You are a helpful assistant. Always respond concisely and accurately.