Model affinity for consistent model selection in agentic loops (#827)
Some checks are pending
CI / pre-commit (push) Waiting to run
CI / plano-tools-tests (push) Waiting to run
CI / native-smoke-test (push) Waiting to run
CI / docker-build (push) Waiting to run
CI / validate-config (push) Waiting to run
CI / security-scan (push) Blocked by required conditions
CI / test-prompt-gateway (push) Blocked by required conditions
CI / test-model-alias-routing (push) Blocked by required conditions
CI / test-responses-api-with-state (push) Blocked by required conditions
CI / e2e-plano-tests (3.10) (push) Blocked by required conditions
CI / e2e-plano-tests (3.11) (push) Blocked by required conditions
CI / e2e-plano-tests (3.12) (push) Blocked by required conditions
CI / e2e-plano-tests (3.13) (push) Blocked by required conditions
CI / e2e-plano-tests (3.14) (push) Blocked by required conditions
CI / e2e-demo-preference (push) Blocked by required conditions
CI / e2e-demo-currency (push) Blocked by required conditions
Publish docker image (latest) / build-arm64 (push) Waiting to run
Publish docker image (latest) / build-amd64 (push) Waiting to run
Publish docker image (latest) / create-manifest (push) Blocked by required conditions
Build and Deploy Documentation / build (push) Waiting to run

This commit is contained in:
Adil Hafeez 2026-04-08 17:32:02 -07:00 committed by GitHub
parent 978b1ea722
commit 8dedf0bec1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 614 additions and 43 deletions

View file

@ -1,6 +1,6 @@
use bytes::Bytes;
use common::configuration::{FilterPipeline, ModelAlias};
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER};
use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, MODEL_AFFINITY_HEADER};
use common::llm_providers::LlmProviders;
use hermesllm::apis::openai::Message;
use hermesllm::apis::openai_responses::InputParam;
@ -94,6 +94,21 @@ async fn llm_chat_inner(
let traceparent = extract_or_generate_traceparent(&request_headers);
// Session pinning: extract session ID and check cache before routing
let session_id: Option<String> = request_headers
.get(MODEL_AFFINITY_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let pinned_model: Option<String> = if let Some(ref sid) = session_id {
state
.router_service
.get_cached_route(sid)
.await
.map(|c| c.model_name)
} else {
None
};
let full_qualified_llm_provider_url = format!("{}{}", state.llm_provider_url, request_path);
// --- Phase 1: Parse and validate the incoming request ---
@ -244,46 +259,65 @@ async fn llm_chat_inner(
}
};
// --- Phase 3: Route the request ---
let routing_span = info_span!(
"routing",
component = "routing",
http.method = "POST",
http.target = %request_path,
model.requested = %model_from_request,
model.alias_resolved = %alias_resolved_model,
route.selected_model = tracing::field::Empty,
routing.determination_ms = tracing::field::Empty,
);
let routing_result = match async {
set_service_name(operation_component::ROUTING);
router_chat_get_upstream_model(
Arc::clone(&state.router_service),
client_request,
&traceparent,
&request_path,
&request_id,
inline_routing_preferences,
)
.await
}
.instrument(routing_span)
.await
{
Ok(result) => result,
Err(err) => {
let mut internal_error = Response::new(full(err.message));
*internal_error.status_mut() = err.status_code;
return Ok(internal_error);
}
};
// Determine final model (router returns "none" when it doesn't select a specific model)
let router_selected_model = routing_result.model_name;
let resolved_model = if router_selected_model != "none" {
router_selected_model
// --- Phase 3: Route the request (or use pinned model from session cache) ---
let resolved_model = if let Some(cached_model) = pinned_model {
info!(
session_id = %session_id.as_deref().unwrap_or(""),
model = %cached_model,
"using pinned routing decision from cache"
);
cached_model
} else {
alias_resolved_model.clone()
let routing_span = info_span!(
"routing",
component = "routing",
http.method = "POST",
http.target = %request_path,
model.requested = %model_from_request,
model.alias_resolved = %alias_resolved_model,
route.selected_model = tracing::field::Empty,
routing.determination_ms = tracing::field::Empty,
);
let routing_result = match async {
set_service_name(operation_component::ROUTING);
router_chat_get_upstream_model(
Arc::clone(&state.router_service),
client_request,
&traceparent,
&request_path,
&request_id,
inline_routing_preferences,
)
.await
}
.instrument(routing_span)
.await
{
Ok(result) => result,
Err(err) => {
let mut internal_error = Response::new(full(err.message));
*internal_error.status_mut() = err.status_code;
return Ok(internal_error);
}
};
let (router_selected_model, route_name) =
(routing_result.model_name, routing_result.route_name);
let model = if router_selected_model != "none" {
router_selected_model
} else {
alias_resolved_model.clone()
};
// Cache the routing decision so subsequent requests with the same session ID are pinned
if let Some(ref sid) = session_id {
state
.router_service
.cache_route(sid.clone(), model.clone(), route_name)
.await;
}
model
};
tracing::Span::current().record(tracing_llm::MODEL_NAME, resolved_model.as_str());

View file

@ -1,6 +1,6 @@
use bytes::Bytes;
use common::configuration::{SpanAttributes, TopLevelRoutingPreference};
use common::consts::REQUEST_ID_HEADER;
use common::consts::{MODEL_AFFINITY_HEADER, REQUEST_ID_HEADER};
use common::errors::BrightStaffError;
use hermesllm::clients::SupportedAPIsFromClient;
use hermesllm::ProviderRequestType;
@ -53,6 +53,9 @@ struct RoutingDecisionResponse {
models: Vec<String>,
route: Option<String>,
trace_id: String,
#[serde(skip_serializing_if = "Option::is_none")]
session_id: Option<String>,
pinned: bool,
}
pub async fn routing_decision(
@ -68,6 +71,11 @@ pub async fn routing_decision(
.map(|s| s.to_string())
.unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
let session_id: Option<String> = request_headers
.get(MODEL_AFFINITY_HEADER)
.and_then(|h| h.to_str().ok())
.map(|s| s.to_string());
let custom_attrs = collect_custom_trace_attributes(&request_headers, span_attributes.as_ref());
let request_span = info_span!(
@ -85,6 +93,7 @@ pub async fn routing_decision(
request_path,
request_headers,
custom_attrs,
session_id,
)
.instrument(request_span)
.await
@ -97,6 +106,7 @@ async fn routing_decision_inner(
request_path: String,
request_headers: hyper::HeaderMap,
custom_attrs: std::collections::HashMap<String, String>,
session_id: Option<String>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
set_service_name(operation_component::ROUTING);
opentelemetry::trace::get_active_span(|span| {
@ -114,6 +124,34 @@ async fn routing_decision_inner(
.unwrap_or("unknown")
.to_string();
// Session pinning: check cache before doing any routing work
if let Some(ref sid) = session_id {
if let Some(cached) = router_service.get_cached_route(sid).await {
info!(
session_id = %sid,
model = %cached.model_name,
route = ?cached.route_name,
"returning pinned routing decision from cache"
);
let response = RoutingDecisionResponse {
models: vec![cached.model_name],
route: cached.route_name,
trace_id,
session_id: Some(sid.clone()),
pinned: true,
};
let json = serde_json::to_string(&response).unwrap();
let body = Full::new(Bytes::from(json))
.map_err(|never| match never {})
.boxed();
return Ok(Response::builder()
.status(StatusCode::OK)
.header("Content-Type", "application/json")
.body(body)
.unwrap());
}
}
// Parse request body
let raw_bytes = request.collect().await?.to_bytes();
@ -152,7 +190,7 @@ async fn routing_decision_inner(
};
let routing_result = router_chat_get_upstream_model(
router_service,
Arc::clone(&router_service),
client_request,
&traceparent,
&request_path,
@ -163,10 +201,23 @@ async fn routing_decision_inner(
match routing_result {
Ok(result) => {
// Cache the result if session_id is present
if let Some(ref sid) = session_id {
router_service
.cache_route(
sid.clone(),
result.model_name.clone(),
result.route_name.clone(),
)
.await;
}
let response = RoutingDecisionResponse {
models: result.models,
route: result.route_name,
trace_id,
session_id,
pinned: false,
};
info!(
@ -329,6 +380,8 @@ mod tests {
],
route: Some("code_generation".to_string()),
trace_id: "abc123".to_string(),
session_id: Some("sess-abc".to_string()),
pinned: true,
};
let json = serde_json::to_string(&response).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
@ -336,6 +389,8 @@ mod tests {
assert_eq!(parsed["models"][1], "openai/gpt-4o");
assert_eq!(parsed["route"], "code_generation");
assert_eq!(parsed["trace_id"], "abc123");
assert_eq!(parsed["session_id"], "sess-abc");
assert_eq!(parsed["pinned"], true);
}
#[test]
@ -344,10 +399,14 @@ mod tests {
models: vec!["none".to_string()],
route: None,
trace_id: "abc123".to_string(),
session_id: None,
pinned: false,
};
let json = serde_json::to_string(&response).unwrap();
let parsed: serde_json::Value = serde_json::from_str(&json).unwrap();
assert_eq!(parsed["models"][0], "none");
assert!(parsed["route"].is_null());
assert!(parsed.get("session_id").is_none());
assert_eq!(parsed["pinned"], false);
}
}

View file

@ -174,6 +174,9 @@ async fn init_app_state(
.map(|p| p.name.clone())
.unwrap_or_else(|| DEFAULT_ROUTING_LLM_PROVIDER.to_string());
let session_ttl_seconds = config.routing.as_ref().and_then(|r| r.session_ttl_seconds);
let session_max_entries = config.routing.as_ref().and_then(|r| r.session_max_entries);
// Validate that top-level routing_preferences requires v0.4.0+.
let config_version = parse_semver(&config.version);
let is_v040_plus = config_version >= (0, 4, 0);
@ -300,8 +303,22 @@ async fn init_app_state(
format!("{llm_provider_url}{CHAT_COMPLETIONS_PATH}"),
routing_model_name,
routing_llm_provider,
session_ttl_seconds,
session_max_entries,
));
// Spawn background task to clean up expired session cache entries every 5 minutes
{
let router_service = Arc::clone(&router_service);
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(300));
loop {
interval.tick().await;
router_service.cleanup_expired_sessions().await;
}
});
}
let orchestrator_model_name: String = overrides
.agent_orchestration_model
.as_deref()

View file

@ -1,4 +1,4 @@
use std::{collections::HashMap, sync::Arc};
use std::{collections::HashMap, sync::Arc, time::Duration, time::Instant};
use common::{
configuration::TopLevelRoutingPreference,
@ -9,6 +9,7 @@ use super::router_model::{ModelUsagePreference, RoutingPreference};
use hermesllm::apis::openai::Message;
use hyper::header;
use thiserror::Error;
use tokio::sync::RwLock;
use tracing::{debug, info};
use super::http::{self, post_and_extract_content};
@ -17,6 +18,17 @@ use super::router_model::RouterModel;
use crate::router::router_model_v1;
const DEFAULT_SESSION_TTL_SECONDS: u64 = 600;
const DEFAULT_SESSION_MAX_ENTRIES: usize = 10_000;
const MAX_SESSION_MAX_ENTRIES: usize = 10_000;
#[derive(Clone, Debug)]
pub struct CachedRoute {
pub model_name: String,
pub route_name: Option<String>,
pub cached_at: Instant,
}
pub struct RouterService {
router_url: String,
client: reqwest::Client,
@ -24,6 +36,9 @@ pub struct RouterService {
routing_provider_name: String,
top_level_preferences: HashMap<String, TopLevelRoutingPreference>,
metrics_service: Option<Arc<ModelMetricsService>>,
session_cache: RwLock<HashMap<String, CachedRoute>>,
session_ttl: Duration,
session_max_entries: usize,
}
#[derive(Debug, Error)]
@ -44,6 +59,8 @@ impl RouterService {
router_url: String,
routing_model_name: String,
routing_provider_name: String,
session_ttl_seconds: Option<u64>,
session_max_entries: Option<usize>,
) -> Self {
let top_level_preferences: HashMap<String, TopLevelRoutingPreference> = top_level_prefs
.map_or_else(HashMap::new, |prefs| {
@ -74,6 +91,12 @@ impl RouterService {
router_model_v1::MAX_TOKEN_LEN,
));
let session_ttl =
Duration::from_secs(session_ttl_seconds.unwrap_or(DEFAULT_SESSION_TTL_SECONDS));
let session_max_entries = session_max_entries
.unwrap_or(DEFAULT_SESSION_MAX_ENTRIES)
.min(MAX_SESSION_MAX_ENTRIES);
RouterService {
router_url,
client: reqwest::Client::new(),
@ -81,6 +104,64 @@ impl RouterService {
routing_provider_name,
top_level_preferences,
metrics_service,
session_cache: RwLock::new(HashMap::new()),
session_ttl,
session_max_entries,
}
}
/// Look up a cached routing decision by session ID.
/// Returns None if not found or expired.
pub async fn get_cached_route(&self, session_id: &str) -> Option<CachedRoute> {
let cache = self.session_cache.read().await;
if let Some(entry) = cache.get(session_id) {
if entry.cached_at.elapsed() < self.session_ttl {
return Some(entry.clone());
}
}
None
}
/// Store a routing decision in the session cache.
/// If at max capacity, evicts the oldest entry.
pub async fn cache_route(
&self,
session_id: String,
model_name: String,
route_name: Option<String>,
) {
let mut cache = self.session_cache.write().await;
if cache.len() >= self.session_max_entries && !cache.contains_key(&session_id) {
if let Some(oldest_key) = cache
.iter()
.min_by_key(|(_, v)| v.cached_at)
.map(|(k, _)| k.clone())
{
cache.remove(&oldest_key);
}
}
cache.insert(
session_id,
CachedRoute {
model_name,
route_name,
cached_at: Instant::now(),
},
);
}
/// Remove all expired entries from the session cache.
pub async fn cleanup_expired_sessions(&self) {
let mut cache = self.session_cache.write().await;
let before = cache.len();
cache.retain(|_, entry| entry.cached_at.elapsed() < self.session_ttl);
let removed = before - cache.len();
if removed > 0 {
info!(
removed = removed,
remaining = cache.len(),
"cleaned up expired session cache entries"
);
}
}
@ -198,3 +279,102 @@ impl RouterService {
Ok(result)
}
}
#[cfg(test)]
mod tests {
use super::*;
fn make_router_service(ttl_seconds: u64, max_entries: usize) -> RouterService {
RouterService::new(
None,
None,
"http://localhost:12001/v1/chat/completions".to_string(),
"Arch-Router".to_string(),
"arch-router".to_string(),
Some(ttl_seconds),
Some(max_entries),
)
}
#[tokio::test]
async fn test_cache_miss_returns_none() {
let svc = make_router_service(600, 100);
assert!(svc.get_cached_route("unknown-session").await.is_none());
}
#[tokio::test]
async fn test_cache_hit_returns_cached_route() {
let svc = make_router_service(600, 100);
svc.cache_route(
"s1".to_string(),
"gpt-4o".to_string(),
Some("code".to_string()),
)
.await;
let cached = svc.get_cached_route("s1").await.unwrap();
assert_eq!(cached.model_name, "gpt-4o");
assert_eq!(cached.route_name, Some("code".to_string()));
}
#[tokio::test]
async fn test_cache_expired_entry_returns_none() {
let svc = make_router_service(0, 100);
svc.cache_route("s1".to_string(), "gpt-4o".to_string(), None)
.await;
assert!(svc.get_cached_route("s1").await.is_none());
}
#[tokio::test]
async fn test_cleanup_removes_expired() {
let svc = make_router_service(0, 100);
svc.cache_route("s1".to_string(), "gpt-4o".to_string(), None)
.await;
svc.cache_route("s2".to_string(), "claude".to_string(), None)
.await;
svc.cleanup_expired_sessions().await;
let cache = svc.session_cache.read().await;
assert!(cache.is_empty());
}
#[tokio::test]
async fn test_cache_evicts_oldest_when_full() {
let svc = make_router_service(600, 2);
svc.cache_route("s1".to_string(), "model-a".to_string(), None)
.await;
tokio::time::sleep(Duration::from_millis(10)).await;
svc.cache_route("s2".to_string(), "model-b".to_string(), None)
.await;
svc.cache_route("s3".to_string(), "model-c".to_string(), None)
.await;
let cache = svc.session_cache.read().await;
assert_eq!(cache.len(), 2);
assert!(!cache.contains_key("s1"));
assert!(cache.contains_key("s2"));
assert!(cache.contains_key("s3"));
}
#[tokio::test]
async fn test_cache_update_existing_session_does_not_evict() {
let svc = make_router_service(600, 2);
svc.cache_route("s1".to_string(), "model-a".to_string(), None)
.await;
svc.cache_route("s2".to_string(), "model-b".to_string(), None)
.await;
svc.cache_route(
"s1".to_string(),
"model-a-updated".to_string(),
Some("route".to_string()),
)
.await;
let cache = svc.session_cache.read().await;
assert_eq!(cache.len(), 2);
assert_eq!(cache.get("s1").unwrap().model_name, "model-a-updated");
}
}