2025-12-11 15:21:57 -08:00
|
|
|
use bytes::Bytes;
|
2026-01-28 17:47:33 -08:00
|
|
|
use common::configuration::ModelAlias;
|
2025-12-25 21:08:37 -08:00
|
|
|
use common::consts::{
|
|
|
|
|
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
|
|
|
|
|
};
|
2026-01-28 17:47:33 -08:00
|
|
|
use common::llm_providers::LlmProviders;
|
2025-12-17 12:18:38 -08:00
|
|
|
use hermesllm::apis::openai_responses::InputParam;
|
|
|
|
|
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
|
2025-12-11 15:21:57 -08:00
|
|
|
use hermesllm::{ProviderRequest, ProviderRequestType};
|
|
|
|
|
use http_body_util::combinators::BoxBody;
|
|
|
|
|
use http_body_util::{BodyExt, Full};
|
|
|
|
|
use hyper::header::{self};
|
|
|
|
|
use hyper::{Request, Response, StatusCode};
|
2026-02-09 13:33:27 -08:00
|
|
|
use opentelemetry::global;
|
|
|
|
|
use opentelemetry::trace::get_active_span;
|
|
|
|
|
use opentelemetry_http::HeaderInjector;
|
2025-12-11 15:21:57 -08:00
|
|
|
use std::collections::HashMap;
|
|
|
|
|
use std::sync::Arc;
|
|
|
|
|
use tokio::sync::RwLock;
|
2026-02-09 13:33:27 -08:00
|
|
|
use tracing::{debug, info, info_span, warn, Instrument};
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
use crate::handlers::router_chat::router_chat_get_upstream_model;
|
2025-12-25 21:08:37 -08:00
|
|
|
use crate::handlers::utils::{
|
|
|
|
|
create_streaming_response, truncate_message, ObservableStreamProcessor,
|
|
|
|
|
};
|
|
|
|
|
use crate::router::llm_router::RouterService;
|
2025-12-17 12:18:38 -08:00
|
|
|
use crate::state::response_state_processor::ResponsesStateProcessor;
|
|
|
|
|
use crate::state::{
|
2025-12-25 21:08:37 -08:00
|
|
|
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
|
2025-12-17 12:18:38 -08:00
|
|
|
};
|
2026-02-09 13:33:27 -08:00
|
|
|
use crate::tracing::{operation_component, set_service_name};
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
|
|
|
|
|
Full::new(chunk.into())
|
|
|
|
|
.map_err(|never| match never {})
|
|
|
|
|
.boxed()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
pub async fn llm_chat(
|
|
|
|
|
request: Request<hyper::body::Incoming>,
|
|
|
|
|
router_service: Arc<RouterService>,
|
|
|
|
|
full_qualified_llm_provider_url: String,
|
|
|
|
|
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
2026-01-28 17:47:33 -08:00
|
|
|
llm_providers: Arc<RwLock<LlmProviders>>,
|
2025-12-17 12:18:38 -08:00
|
|
|
state_storage: Option<Arc<dyn StateStorage>>,
|
2025-12-11 15:21:57 -08:00
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
let request_path = request.uri().path().to_string();
|
|
|
|
|
let request_headers = request.headers().clone();
|
2026-01-07 12:04:10 -08:00
|
|
|
let request_id: String = match request_headers
|
2025-12-17 12:18:38 -08:00
|
|
|
.get(REQUEST_ID_HEADER)
|
|
|
|
|
.and_then(|h| h.to_str().ok())
|
|
|
|
|
.map(|s| s.to_string())
|
2026-01-07 12:04:10 -08:00
|
|
|
{
|
|
|
|
|
Some(id) => id,
|
2026-02-09 13:33:27 -08:00
|
|
|
None => uuid::Uuid::new_v4().to_string(),
|
2026-01-07 12:04:10 -08:00
|
|
|
};
|
2025-12-11 15:21:57 -08:00
|
|
|
|
2026-02-09 13:33:27 -08:00
|
|
|
// Create a span with request_id that will be included in all log lines
|
|
|
|
|
let request_span = info_span!(
|
|
|
|
|
"llm",
|
|
|
|
|
component = "llm",
|
|
|
|
|
request_id = %request_id,
|
|
|
|
|
http.method = %request.method(),
|
|
|
|
|
http.path = %request_path,
|
|
|
|
|
);
|
|
|
|
|
|
|
|
|
|
// Execute the rest of the handler inside the span
|
|
|
|
|
llm_chat_inner(
|
|
|
|
|
request,
|
|
|
|
|
router_service,
|
|
|
|
|
full_qualified_llm_provider_url,
|
|
|
|
|
model_aliases,
|
|
|
|
|
llm_providers,
|
|
|
|
|
state_storage,
|
|
|
|
|
request_id,
|
|
|
|
|
request_path,
|
|
|
|
|
request_headers,
|
|
|
|
|
)
|
|
|
|
|
.instrument(request_span)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
#[allow(clippy::too_many_arguments)]
|
|
|
|
|
async fn llm_chat_inner(
|
|
|
|
|
request: Request<hyper::body::Incoming>,
|
|
|
|
|
router_service: Arc<RouterService>,
|
|
|
|
|
full_qualified_llm_provider_url: String,
|
|
|
|
|
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
|
|
|
|
|
llm_providers: Arc<RwLock<LlmProviders>>,
|
|
|
|
|
state_storage: Option<Arc<dyn StateStorage>>,
|
|
|
|
|
request_id: String,
|
|
|
|
|
request_path: String,
|
|
|
|
|
mut request_headers: hyper::HeaderMap,
|
|
|
|
|
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
|
|
|
|
|
// Set service name for LLM operations
|
|
|
|
|
set_service_name(operation_component::LLM);
|
|
|
|
|
|
2025-12-11 15:21:57 -08:00
|
|
|
// Extract or generate traceparent - this establishes the trace context for all spans
|
2026-01-07 12:04:10 -08:00
|
|
|
let traceparent: String = match request_headers
|
2025-12-17 12:18:38 -08:00
|
|
|
.get(TRACE_PARENT_HEADER)
|
2025-12-11 15:21:57 -08:00
|
|
|
.and_then(|h| h.to_str().ok())
|
|
|
|
|
.map(|s| s.to_string())
|
2026-01-07 12:04:10 -08:00
|
|
|
{
|
|
|
|
|
Some(tp) => tp,
|
|
|
|
|
None => {
|
2025-12-11 15:21:57 -08:00
|
|
|
use uuid::Uuid;
|
|
|
|
|
let trace_id = Uuid::new_v4().to_string().replace("-", "");
|
2026-01-07 12:04:10 -08:00
|
|
|
let generated_tp = format!("00-{}-0000000000000000-01", trace_id);
|
|
|
|
|
warn!(
|
2026-02-09 13:33:27 -08:00
|
|
|
generated_traceparent = %generated_tp,
|
|
|
|
|
"TRACE_PARENT header missing, generated new traceparent"
|
2026-01-07 12:04:10 -08:00
|
|
|
);
|
|
|
|
|
generated_tp
|
|
|
|
|
}
|
|
|
|
|
};
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
let chat_request_bytes = request.collect().await?.to_bytes();
|
|
|
|
|
|
|
|
|
|
debug!(
|
2026-02-09 13:33:27 -08:00
|
|
|
body = %String::from_utf8_lossy(&chat_request_bytes),
|
|
|
|
|
"request body received"
|
2025-12-11 15:21:57 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
let mut client_request = match ProviderRequestType::try_from((
|
|
|
|
|
&chat_request_bytes[..],
|
|
|
|
|
&SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(),
|
|
|
|
|
)) {
|
|
|
|
|
Ok(request) => request,
|
|
|
|
|
Err(err) => {
|
2025-12-25 21:08:37 -08:00
|
|
|
warn!(
|
2026-02-09 13:33:27 -08:00
|
|
|
error = %err,
|
|
|
|
|
"failed to parse request as ProviderRequestType"
|
2025-12-25 21:08:37 -08:00
|
|
|
);
|
2026-02-09 13:33:27 -08:00
|
|
|
let err_msg = format!("Failed to parse request: {}", err);
|
2025-12-11 15:21:57 -08:00
|
|
|
let mut bad_request = Response::new(full(err_msg));
|
|
|
|
|
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
|
|
|
|
return Ok(bad_request);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2025-12-17 12:18:38 -08:00
|
|
|
// === v1/responses state management: Extract input items early ===
|
|
|
|
|
let mut original_input_items = Vec::new();
|
|
|
|
|
let client_api = SupportedAPIsFromClient::from_endpoint(request_path.as_str());
|
2025-12-25 21:08:37 -08:00
|
|
|
let is_responses_api_client = matches!(
|
|
|
|
|
client_api,
|
|
|
|
|
Some(SupportedAPIsFromClient::OpenAIResponsesAPI(_))
|
|
|
|
|
);
|
2025-12-17 12:18:38 -08:00
|
|
|
|
2025-12-11 15:21:57 -08:00
|
|
|
// Model alias resolution: update model field in client_request immediately
|
|
|
|
|
// This ensures all downstream objects use the resolved model
|
|
|
|
|
let model_from_request = client_request.model().to_string();
|
2026-02-09 13:33:27 -08:00
|
|
|
let _temperature = client_request.get_temperature();
|
2025-12-11 15:21:57 -08:00
|
|
|
let is_streaming_request = client_request.is_streaming();
|
2026-02-09 13:33:27 -08:00
|
|
|
let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
|
2025-12-11 15:21:57 -08:00
|
|
|
|
2026-01-28 17:47:33 -08:00
|
|
|
// Validate that the requested model exists in configuration
|
|
|
|
|
// This matches the validation in llm_gateway routing.rs
|
2026-02-09 13:33:27 -08:00
|
|
|
if llm_providers
|
|
|
|
|
.read()
|
|
|
|
|
.await
|
|
|
|
|
.get(&alias_resolved_model)
|
|
|
|
|
.is_none()
|
|
|
|
|
{
|
2026-01-28 17:47:33 -08:00
|
|
|
let err_msg = format!(
|
|
|
|
|
"Model '{}' not found in configured providers",
|
2026-02-09 13:33:27 -08:00
|
|
|
alias_resolved_model
|
2026-01-28 17:47:33 -08:00
|
|
|
);
|
2026-02-09 13:33:27 -08:00
|
|
|
warn!(model = %alias_resolved_model, "model not found in configured providers");
|
2026-01-28 17:47:33 -08:00
|
|
|
let mut bad_request = Response::new(full(err_msg));
|
|
|
|
|
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
|
|
|
|
|
return Ok(bad_request);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Handle provider/model slug format (e.g., "openai/gpt-4")
|
|
|
|
|
// Extract just the model name for upstream (providers don't understand the slug)
|
2026-02-09 13:33:27 -08:00
|
|
|
let model_name_only = if let Some((_, model)) = alias_resolved_model.split_once('/') {
|
2026-01-28 17:47:33 -08:00
|
|
|
model.to_string()
|
|
|
|
|
} else {
|
2026-02-09 13:33:27 -08:00
|
|
|
alias_resolved_model.clone()
|
2026-01-28 17:47:33 -08:00
|
|
|
};
|
|
|
|
|
|
2025-12-11 15:21:57 -08:00
|
|
|
// Extract tool names and user message preview for span attributes
|
2026-02-09 13:33:27 -08:00
|
|
|
let _tool_names = client_request.get_tool_names();
|
|
|
|
|
let _user_message_preview = client_request
|
2025-12-25 21:08:37 -08:00
|
|
|
.get_recent_user_message()
|
2025-12-11 15:21:57 -08:00
|
|
|
.map(|msg| truncate_message(&msg, 50));
|
|
|
|
|
|
2026-01-07 11:20:44 -08:00
|
|
|
// Extract messages for signal analysis (clone before moving client_request)
|
2026-02-09 13:33:27 -08:00
|
|
|
let messages_for_signals = Some(client_request.get_messages());
|
2026-01-07 11:20:44 -08:00
|
|
|
|
2026-01-28 17:47:33 -08:00
|
|
|
// Set the model to just the model name (without provider prefix)
|
|
|
|
|
// This ensures upstream receives "gpt-4" not "openai/gpt-4"
|
|
|
|
|
client_request.set_model(model_name_only.clone());
|
2025-12-11 15:21:57 -08:00
|
|
|
if client_request.remove_metadata_key("archgw_preference_config") {
|
2026-02-09 13:33:27 -08:00
|
|
|
debug!("removed archgw_preference_config from metadata");
|
2025-12-17 12:18:38 -08:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// === v1/responses state management: Determine upstream API and combine input if needed ===
|
|
|
|
|
// Do this BEFORE routing since routing consumes the request
|
|
|
|
|
// Only process state if state_storage is configured
|
|
|
|
|
let mut should_manage_state = false;
|
2025-12-25 21:08:37 -08:00
|
|
|
if is_responses_api_client {
|
|
|
|
|
if let (
|
|
|
|
|
ProviderRequestType::ResponsesAPIRequest(ref mut responses_req),
|
|
|
|
|
Some(ref state_store),
|
|
|
|
|
) = (&mut client_request, &state_storage)
|
|
|
|
|
{
|
2025-12-17 12:18:38 -08:00
|
|
|
// Extract original input once
|
|
|
|
|
original_input_items = extract_input_items(&responses_req.input);
|
|
|
|
|
|
|
|
|
|
// Get the upstream path and check if it's ResponsesAPI
|
|
|
|
|
let upstream_path = get_upstream_path(
|
|
|
|
|
&llm_providers,
|
2026-02-09 13:33:27 -08:00
|
|
|
&alias_resolved_model,
|
2025-12-17 12:18:38 -08:00
|
|
|
&request_path,
|
2026-02-09 13:33:27 -08:00
|
|
|
&alias_resolved_model,
|
2025-12-17 12:18:38 -08:00
|
|
|
is_streaming_request,
|
2025-12-25 21:08:37 -08:00
|
|
|
)
|
|
|
|
|
.await;
|
2025-12-17 12:18:38 -08:00
|
|
|
|
|
|
|
|
let upstream_api = SupportedUpstreamAPIs::from_endpoint(&upstream_path);
|
|
|
|
|
|
|
|
|
|
// Only manage state if upstream is NOT OpenAIResponsesAPI (needs translation)
|
2025-12-25 21:08:37 -08:00
|
|
|
should_manage_state = !matches!(
|
|
|
|
|
upstream_api,
|
|
|
|
|
Some(SupportedUpstreamAPIs::OpenAIResponsesAPI(_))
|
|
|
|
|
);
|
2025-12-17 12:18:38 -08:00
|
|
|
|
|
|
|
|
if should_manage_state {
|
|
|
|
|
// Retrieve and combine conversation history if previous_response_id exists
|
|
|
|
|
if let Some(ref prev_resp_id) = responses_req.previous_response_id {
|
|
|
|
|
match retrieve_and_combine_input(
|
2025-12-25 21:08:37 -08:00
|
|
|
state_store.clone(),
|
2025-12-17 12:18:38 -08:00
|
|
|
prev_resp_id,
|
|
|
|
|
original_input_items, // Pass ownership instead of cloning
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(combined_input) => {
|
|
|
|
|
// Update both the request and original_input_items
|
|
|
|
|
responses_req.input = InputParam::Items(combined_input.clone());
|
|
|
|
|
original_input_items = combined_input;
|
2026-02-09 13:33:27 -08:00
|
|
|
info!(
|
|
|
|
|
items = original_input_items.len(),
|
|
|
|
|
"updated request with conversation history"
|
|
|
|
|
);
|
2025-12-17 12:18:38 -08:00
|
|
|
}
|
|
|
|
|
Err(StateStorageError::NotFound(_)) => {
|
|
|
|
|
// Return 409 Conflict when previous_response_id not found
|
2026-02-09 13:33:27 -08:00
|
|
|
warn!(previous_response_id = %prev_resp_id, "previous response_id not found");
|
2025-12-17 12:18:38 -08:00
|
|
|
let err_msg = format!(
|
2026-02-09 13:33:27 -08:00
|
|
|
"Conversation state not found for previous_response_id: {}",
|
|
|
|
|
prev_resp_id
|
2025-12-17 12:18:38 -08:00
|
|
|
);
|
|
|
|
|
let mut conflict_response = Response::new(full(err_msg));
|
|
|
|
|
*conflict_response.status_mut() = StatusCode::CONFLICT;
|
|
|
|
|
return Ok(conflict_response);
|
|
|
|
|
}
|
|
|
|
|
Err(e) => {
|
|
|
|
|
// Log warning but continue on other storage errors
|
|
|
|
|
warn!(
|
2026-02-09 13:33:27 -08:00
|
|
|
previous_response_id = %prev_resp_id,
|
|
|
|
|
error = %e,
|
|
|
|
|
"failed to retrieve conversation state"
|
2025-12-17 12:18:38 -08:00
|
|
|
);
|
|
|
|
|
// Restore original_input_items since we passed ownership
|
|
|
|
|
original_input_items = extract_input_items(&responses_req.input);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else {
|
2026-02-09 13:33:27 -08:00
|
|
|
debug!("upstream supports ResponsesAPI natively");
|
2025-12-17 12:18:38 -08:00
|
|
|
}
|
|
|
|
|
}
|
2025-12-11 15:21:57 -08:00
|
|
|
}
|
|
|
|
|
|
2025-12-17 12:18:38 -08:00
|
|
|
// Serialize request for upstream BEFORE router consumes it
|
2025-12-11 15:21:57 -08:00
|
|
|
let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap();
|
|
|
|
|
|
|
|
|
|
// Determine routing using the dedicated router_chat module
|
2026-02-09 13:33:27 -08:00
|
|
|
// This gets its own span for latency and error tracking
|
|
|
|
|
let routing_span = info_span!(
|
|
|
|
|
"routing",
|
|
|
|
|
component = "routing",
|
|
|
|
|
http.method = "POST",
|
|
|
|
|
http.target = %request_path,
|
|
|
|
|
model.requested = %model_from_request,
|
|
|
|
|
model.alias_resolved = %alias_resolved_model,
|
|
|
|
|
route.selected_model = tracing::field::Empty,
|
|
|
|
|
routing.determination_ms = tracing::field::Empty,
|
|
|
|
|
);
|
|
|
|
|
let routing_result = match async {
|
|
|
|
|
set_service_name(operation_component::ROUTING);
|
|
|
|
|
router_chat_get_upstream_model(
|
|
|
|
|
router_service,
|
|
|
|
|
client_request, // Pass the original request - router_chat will convert it
|
|
|
|
|
&traceparent,
|
|
|
|
|
&request_path,
|
|
|
|
|
&request_id,
|
|
|
|
|
)
|
|
|
|
|
.await
|
|
|
|
|
}
|
|
|
|
|
.instrument(routing_span)
|
2025-12-11 15:21:57 -08:00
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(result) => result,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let mut internal_error = Response::new(full(err.message));
|
|
|
|
|
*internal_error.status_mut() = err.status_code;
|
|
|
|
|
return Ok(internal_error);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
2026-01-28 17:47:33 -08:00
|
|
|
// Determine final model to use
|
|
|
|
|
// Router returns "none" as a sentinel value when it doesn't select a specific model
|
|
|
|
|
let router_selected_model = routing_result.model_name;
|
2026-02-09 13:33:27 -08:00
|
|
|
let resolved_model = if router_selected_model != "none" {
|
2026-01-28 17:47:33 -08:00
|
|
|
// Router selected a specific model via routing preferences
|
|
|
|
|
router_selected_model
|
|
|
|
|
} else {
|
|
|
|
|
// Router returned "none" sentinel, use validated resolved_model from request
|
2026-02-09 13:33:27 -08:00
|
|
|
alias_resolved_model.clone()
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
let span_name = if model_from_request == resolved_model {
|
|
|
|
|
format!("POST {} {}", request_path, resolved_model)
|
|
|
|
|
} else {
|
|
|
|
|
format!(
|
|
|
|
|
"POST {} {} -> {}",
|
|
|
|
|
request_path, model_from_request, resolved_model
|
|
|
|
|
)
|
2026-01-28 17:47:33 -08:00
|
|
|
};
|
2026-02-09 13:33:27 -08:00
|
|
|
get_active_span(|span| {
|
|
|
|
|
span.update_name(span_name.clone());
|
|
|
|
|
});
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
debug!(
|
2026-02-09 13:33:27 -08:00
|
|
|
url = %full_qualified_llm_provider_url,
|
|
|
|
|
provider_hint = %resolved_model,
|
|
|
|
|
upstream_model = %model_name_only,
|
|
|
|
|
"Routing to upstream"
|
2025-12-11 15:21:57 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
request_headers.insert(
|
|
|
|
|
ARCH_PROVIDER_HINT_HEADER,
|
2026-02-09 13:33:27 -08:00
|
|
|
header::HeaderValue::from_str(&resolved_model).unwrap(),
|
2025-12-11 15:21:57 -08:00
|
|
|
);
|
|
|
|
|
|
|
|
|
|
request_headers.insert(
|
|
|
|
|
header::HeaderName::from_static(ARCH_IS_STREAMING_HEADER),
|
|
|
|
|
header::HeaderValue::from_str(&is_streaming_request.to_string()).unwrap(),
|
|
|
|
|
);
|
|
|
|
|
// remove content-length header if it exists
|
|
|
|
|
request_headers.remove(header::CONTENT_LENGTH);
|
|
|
|
|
|
2026-02-09 13:33:27 -08:00
|
|
|
// Inject current LLM span's trace context so upstream spans are children of plano(llm)
|
|
|
|
|
global::get_text_map_propagator(|propagator| {
|
|
|
|
|
let cx = tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
|
|
|
|
|
propagator.inject_context(&cx, &mut HeaderInjector(&mut request_headers));
|
|
|
|
|
});
|
|
|
|
|
|
2025-12-11 15:21:57 -08:00
|
|
|
// Capture start time right before sending request to upstream
|
|
|
|
|
let request_start_time = std::time::Instant::now();
|
2026-02-09 13:33:27 -08:00
|
|
|
let _request_start_system_time = std::time::SystemTime::now();
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
let llm_response = match reqwest::Client::new()
|
2026-02-09 13:33:27 -08:00
|
|
|
.post(&full_qualified_llm_provider_url)
|
2025-12-11 15:21:57 -08:00
|
|
|
.headers(request_headers)
|
|
|
|
|
.body(client_request_bytes_for_upstream)
|
|
|
|
|
.send()
|
|
|
|
|
.await
|
|
|
|
|
{
|
|
|
|
|
Ok(res) => res,
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let err_msg = format!("Failed to send request: {}", err);
|
|
|
|
|
let mut internal_error = Response::new(full(err_msg));
|
|
|
|
|
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
|
|
|
return Ok(internal_error);
|
|
|
|
|
}
|
|
|
|
|
};
|
|
|
|
|
|
|
|
|
|
// copy over the headers and status code from the original response
|
|
|
|
|
let response_headers = llm_response.headers().clone();
|
|
|
|
|
let upstream_status = llm_response.status();
|
|
|
|
|
let mut response = Response::builder().status(upstream_status);
|
|
|
|
|
let headers = response.headers_mut().unwrap();
|
|
|
|
|
for (header_name, header_value) in response_headers.iter() {
|
|
|
|
|
headers.insert(header_name, header_value.clone());
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Build LLM span with actual status code using constants
|
|
|
|
|
let byte_stream = llm_response.bytes_stream();
|
|
|
|
|
|
2025-12-17 12:18:38 -08:00
|
|
|
// Create base processor for metrics and tracing
|
|
|
|
|
let base_processor = ObservableStreamProcessor::new(
|
2025-12-11 15:21:57 -08:00
|
|
|
operation_component::LLM,
|
2026-02-09 13:33:27 -08:00
|
|
|
span_name,
|
2025-12-11 15:21:57 -08:00
|
|
|
request_start_time,
|
2026-02-09 13:33:27 -08:00
|
|
|
messages_for_signals,
|
2025-12-11 15:21:57 -08:00
|
|
|
);
|
|
|
|
|
|
2025-12-17 12:18:38 -08:00
|
|
|
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
|
|
|
|
|
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
|
2025-12-25 21:08:37 -08:00
|
|
|
let streaming_response = if let (true, false, Some(state_store)) = (
|
|
|
|
|
should_manage_state,
|
|
|
|
|
original_input_items.is_empty(),
|
|
|
|
|
state_storage,
|
|
|
|
|
) {
|
2025-12-17 12:18:38 -08:00
|
|
|
// Extract Content-Encoding header to handle decompression for state parsing
|
|
|
|
|
let content_encoding = response_headers
|
|
|
|
|
.get("content-encoding")
|
|
|
|
|
.and_then(|v| v.to_str().ok())
|
|
|
|
|
.map(|s| s.to_string());
|
|
|
|
|
|
|
|
|
|
// Wrap with state management processor to store state after response completes
|
|
|
|
|
let state_processor = ResponsesStateProcessor::new(
|
|
|
|
|
base_processor,
|
2025-12-25 21:08:37 -08:00
|
|
|
state_store,
|
2025-12-17 12:18:38 -08:00
|
|
|
original_input_items,
|
2026-02-09 13:33:27 -08:00
|
|
|
alias_resolved_model.clone(),
|
2025-12-17 12:18:38 -08:00
|
|
|
resolved_model.clone(),
|
|
|
|
|
is_streaming_request,
|
|
|
|
|
false, // Not OpenAI upstream since should_manage_state is true
|
|
|
|
|
content_encoding,
|
2026-01-07 12:04:10 -08:00
|
|
|
request_id,
|
2025-12-17 12:18:38 -08:00
|
|
|
);
|
|
|
|
|
create_streaming_response(byte_stream, state_processor, 16)
|
|
|
|
|
} else {
|
|
|
|
|
// Use base processor without state management
|
|
|
|
|
create_streaming_response(byte_stream, base_processor, 16)
|
|
|
|
|
};
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
match response.body(streaming_response.body) {
|
|
|
|
|
Ok(response) => Ok(response),
|
|
|
|
|
Err(err) => {
|
|
|
|
|
let err_msg = format!("Failed to create response: {}", err);
|
|
|
|
|
let mut internal_error = Response::new(full(err_msg));
|
|
|
|
|
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
|
|
|
|
|
Ok(internal_error)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Resolves model aliases by looking up the requested model in the model_aliases map.
|
|
|
|
|
/// Returns the target model if an alias is found, otherwise returns the original model.
|
|
|
|
|
fn resolve_model_alias(
|
|
|
|
|
model_from_request: &str,
|
|
|
|
|
model_aliases: &Arc<Option<HashMap<String, ModelAlias>>>,
|
|
|
|
|
) -> String {
|
|
|
|
|
if let Some(aliases) = model_aliases.as_ref() {
|
|
|
|
|
if let Some(model_alias) = aliases.get(model_from_request) {
|
|
|
|
|
debug!(
|
|
|
|
|
"Model Alias: 'From {}' -> 'To {}'",
|
|
|
|
|
model_from_request, model_alias.target
|
|
|
|
|
);
|
|
|
|
|
return model_alias.target.clone();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
model_from_request.to_string()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/// Calculates the upstream path for the provider based on the model name.
|
|
|
|
|
/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix,
|
|
|
|
|
/// then uses target_endpoint_for_provider to calculate the correct upstream path.
|
|
|
|
|
async fn get_upstream_path(
|
2026-01-28 17:47:33 -08:00
|
|
|
llm_providers: &Arc<RwLock<LlmProviders>>,
|
2025-12-11 15:21:57 -08:00
|
|
|
model_name: &str,
|
|
|
|
|
request_path: &str,
|
|
|
|
|
resolved_model: &str,
|
|
|
|
|
is_streaming: bool,
|
|
|
|
|
) -> String {
|
2025-12-17 12:18:38 -08:00
|
|
|
let (provider_id, base_url_path_prefix) = get_provider_info(llm_providers, model_name).await;
|
2025-12-11 15:21:57 -08:00
|
|
|
|
|
|
|
|
// Calculate the upstream path using the proper API
|
|
|
|
|
let client_api = SupportedAPIsFromClient::from_endpoint(request_path)
|
|
|
|
|
.expect("Should have valid API endpoint");
|
|
|
|
|
|
|
|
|
|
client_api.target_endpoint_for_provider(
|
|
|
|
|
&provider_id,
|
|
|
|
|
request_path,
|
|
|
|
|
resolved_model,
|
|
|
|
|
is_streaming,
|
|
|
|
|
base_url_path_prefix.as_deref(),
|
|
|
|
|
)
|
|
|
|
|
}
|
2025-12-17 12:18:38 -08:00
|
|
|
|
|
|
|
|
/// Helper function to get provider info (ProviderId and base_url_path_prefix)
|
|
|
|
|
async fn get_provider_info(
|
2026-01-28 17:47:33 -08:00
|
|
|
llm_providers: &Arc<RwLock<LlmProviders>>,
|
2025-12-17 12:18:38 -08:00
|
|
|
model_name: &str,
|
|
|
|
|
) -> (hermesllm::ProviderId, Option<String>) {
|
|
|
|
|
let providers_lock = llm_providers.read().await;
|
|
|
|
|
|
2026-01-28 17:47:33 -08:00
|
|
|
// Try to find by model name or provider name using LlmProviders::get
|
|
|
|
|
// This handles both "gpt-4" and "openai/gpt-4" formats
|
|
|
|
|
if let Some(provider) = providers_lock.get(model_name) {
|
2025-12-17 12:18:38 -08:00
|
|
|
let provider_id = provider.provider_interface.to_provider_id();
|
|
|
|
|
let prefix = provider.base_url_path_prefix.clone();
|
|
|
|
|
return (provider_id, prefix);
|
|
|
|
|
}
|
|
|
|
|
|
2026-01-28 17:47:33 -08:00
|
|
|
// Fall back to default provider
|
|
|
|
|
if let Some(provider) = providers_lock.default() {
|
2025-12-17 12:18:38 -08:00
|
|
|
let provider_id = provider.provider_interface.to_provider_id();
|
|
|
|
|
let prefix = provider.base_url_path_prefix.clone();
|
|
|
|
|
(provider_id, prefix)
|
|
|
|
|
} else {
|
|
|
|
|
// Last resort: use OpenAI as hardcoded fallback
|
|
|
|
|
warn!("No default provider found, falling back to OpenAI");
|
|
|
|
|
(hermesllm::ProviderId::OpenAI, None)
|
|
|
|
|
}
|
|
|
|
|
}
|