mirror of
https://github.com/katanemo/plano.git
synced 2026-05-11 00:32:42 +02:00
use plano-orchestrator for LLM routing, remove arch-router (#886)
This commit is contained in:
parent
980faef6be
commit
90b926c2ce
29 changed files with 407 additions and 1412 deletions
|
|
@ -177,6 +177,7 @@ mod tests {
|
|||
"http://localhost:8080".to_string(),
|
||||
"test-model".to_string(),
|
||||
"plano-orchestrator".to_string(),
|
||||
crate::router::orchestrator_model_v1::MAX_TOKEN_LEN,
|
||||
))
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,6 +23,7 @@ mod tests {
|
|||
"http://localhost:8080".to_string(),
|
||||
"test-model".to_string(),
|
||||
"plano-orchestrator".to_string(),
|
||||
crate::router::orchestrator_model_v1::MAX_TOKEN_LEN,
|
||||
))
|
||||
}
|
||||
|
||||
|
|
@ -147,8 +148,8 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_error_handling_flow() {
|
||||
let router_service = create_test_orchestrator_service();
|
||||
let agent_selector = AgentSelector::new(router_service);
|
||||
let orchestrator_service = create_test_orchestrator_service();
|
||||
let agent_selector = AgentSelector::new(orchestrator_service);
|
||||
|
||||
// Test listener not found
|
||||
let result = agent_selector.find_listener(Some("nonexistent"), &[]);
|
||||
|
|
|
|||
|
|
@ -22,7 +22,6 @@ pub(crate) mod model_selection;
|
|||
|
||||
use crate::app_state::AppState;
|
||||
use crate::handlers::agents::pipeline::PipelineProcessor;
|
||||
use crate::handlers::extract_or_generate_traceparent;
|
||||
use crate::handlers::extract_request_id;
|
||||
use crate::handlers::full;
|
||||
use crate::state::response_state_processor::ResponsesStateProcessor;
|
||||
|
|
@ -92,22 +91,20 @@ async fn llm_chat_inner(
|
|||
}
|
||||
});
|
||||
|
||||
let traceparent = extract_or_generate_traceparent(&request_headers);
|
||||
|
||||
// Session pinning: extract session ID and check cache before routing
|
||||
let session_id: Option<String> = request_headers
|
||||
.get(MODEL_AFFINITY_HEADER)
|
||||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
let tenant_id: Option<String> = state
|
||||
.router_service
|
||||
.orchestrator_service
|
||||
.tenant_header()
|
||||
.and_then(|hdr| request_headers.get(hdr))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
let pinned_model: Option<String> = if let Some(ref sid) = session_id {
|
||||
state
|
||||
.router_service
|
||||
.orchestrator_service
|
||||
.get_cached_route(sid, tenant_id.as_deref())
|
||||
.await
|
||||
.map(|c| c.model_name)
|
||||
|
|
@ -287,9 +284,8 @@ async fn llm_chat_inner(
|
|||
let routing_result = match async {
|
||||
set_service_name(operation_component::ROUTING);
|
||||
router_chat_get_upstream_model(
|
||||
Arc::clone(&state.router_service),
|
||||
Arc::clone(&state.orchestrator_service),
|
||||
client_request,
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
inline_routing_preferences,
|
||||
|
|
@ -315,10 +311,9 @@ async fn llm_chat_inner(
|
|||
alias_resolved_model.clone()
|
||||
};
|
||||
|
||||
// Cache the routing decision so subsequent requests with the same session ID are pinned
|
||||
if let Some(ref sid) = session_id {
|
||||
state
|
||||
.router_service
|
||||
.orchestrator_service
|
||||
.cache_route(sid.clone(), tenant_id.as_deref(), model.clone(), route_name)
|
||||
.await;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -5,7 +5,7 @@ use hyper::StatusCode;
|
|||
use std::sync::Arc;
|
||||
use tracing::{debug, info, warn};
|
||||
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::router::orchestrator::OrchestratorService;
|
||||
use crate::streaming::truncate_message;
|
||||
use crate::tracing::routing;
|
||||
|
||||
|
|
@ -37,9 +37,8 @@ impl RoutingError {
|
|||
/// * `Ok(RoutingResult)` - Contains the selected model name and span ID
|
||||
/// * `Err(RoutingError)` - Contains error details and optional span ID
|
||||
pub async fn router_chat_get_upstream_model(
|
||||
router_service: Arc<RouterService>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
client_request: ProviderRequestType,
|
||||
traceparent: &str,
|
||||
request_path: &str,
|
||||
request_id: &str,
|
||||
inline_routing_preferences: Option<Vec<TopLevelRoutingPreference>>,
|
||||
|
|
@ -99,11 +98,9 @@ pub async fn router_chat_get_upstream_model(
|
|||
// Capture start time for routing span
|
||||
let routing_start_time = std::time::Instant::now();
|
||||
|
||||
// Attempt to determine route using the router service
|
||||
let routing_result = router_service
|
||||
let routing_result = orchestrator_service
|
||||
.determine_route(
|
||||
&chat_request.messages,
|
||||
traceparent,
|
||||
inline_routing_preferences,
|
||||
request_id,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ use tracing::{debug, info, info_span, warn, Instrument};
|
|||
|
||||
use super::extract_or_generate_traceparent;
|
||||
use crate::handlers::llm::model_selection::router_chat_get_upstream_model;
|
||||
use crate::router::llm::RouterService;
|
||||
use crate::router::orchestrator::OrchestratorService;
|
||||
use crate::tracing::{collect_custom_trace_attributes, operation_component, set_service_name};
|
||||
|
||||
/// Extracts `routing_preferences` from a JSON body, returning the cleaned body bytes
|
||||
|
|
@ -60,7 +60,7 @@ struct RoutingDecisionResponse {
|
|||
|
||||
pub async fn routing_decision(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
request_path: String,
|
||||
span_attributes: &Option<SpanAttributes>,
|
||||
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
||||
|
|
@ -76,7 +76,7 @@ pub async fn routing_decision(
|
|||
.and_then(|h| h.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
let tenant_id: Option<String> = router_service
|
||||
let tenant_id: Option<String> = orchestrator_service
|
||||
.tenant_header()
|
||||
.and_then(|hdr| request_headers.get(hdr))
|
||||
.and_then(|v| v.to_str().ok())
|
||||
|
|
@ -94,7 +94,7 @@ pub async fn routing_decision(
|
|||
|
||||
routing_decision_inner(
|
||||
request,
|
||||
router_service,
|
||||
orchestrator_service,
|
||||
request_id,
|
||||
request_path,
|
||||
request_headers,
|
||||
|
|
@ -109,7 +109,7 @@ pub async fn routing_decision(
|
|||
#[allow(clippy::too_many_arguments)]
|
||||
async fn routing_decision_inner(
|
||||
request: Request<hyper::body::Incoming>,
|
||||
router_service: Arc<RouterService>,
|
||||
orchestrator_service: Arc<OrchestratorService>,
|
||||
request_id: String,
|
||||
request_path: String,
|
||||
request_headers: hyper::HeaderMap,
|
||||
|
|
@ -133,9 +133,8 @@ async fn routing_decision_inner(
|
|||
.unwrap_or("unknown")
|
||||
.to_string();
|
||||
|
||||
// Session pinning: check cache before doing any routing work
|
||||
if let Some(ref sid) = session_id {
|
||||
if let Some(cached) = router_service
|
||||
if let Some(cached) = orchestrator_service
|
||||
.get_cached_route(sid, tenant_id.as_deref())
|
||||
.await
|
||||
{
|
||||
|
|
@ -202,9 +201,8 @@ async fn routing_decision_inner(
|
|||
};
|
||||
|
||||
let routing_result = router_chat_get_upstream_model(
|
||||
Arc::clone(&router_service),
|
||||
Arc::clone(&orchestrator_service),
|
||||
client_request,
|
||||
&traceparent,
|
||||
&request_path,
|
||||
&request_id,
|
||||
inline_routing_preferences,
|
||||
|
|
@ -213,9 +211,8 @@ async fn routing_decision_inner(
|
|||
|
||||
match routing_result {
|
||||
Ok(result) => {
|
||||
// Cache the result if session_id is present
|
||||
if let Some(ref sid) = session_id {
|
||||
router_service
|
||||
orchestrator_service
|
||||
.cache_route(
|
||||
sid.clone(),
|
||||
tenant_id.as_deref(),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue