From f60cac27f4511ce470023a530f295f00941112c2 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 14 May 2025 15:33:09 -0700 Subject: [PATCH] remove dead code --- crates/brightstaff/src/router/llm_router.rs | 23 +++- crates/llm_gateway/src/stream_context.rs | 110 +----------------- crates/prompt_gateway/src/stream_context.rs | 6 +- .../{test.rest => test_router_endpoint.rest} | 0 4 files changed, 24 insertions(+), 115 deletions(-) rename demos/use_cases/preference_based_routing/{test.rest => test_router_endpoint.rest} (100%) diff --git a/crates/brightstaff/src/router/llm_router.rs b/crates/brightstaff/src/router/llm_router.rs index e9f988ad..12a21ae0 100644 --- a/crates/brightstaff/src/router/llm_router.rs +++ b/crates/brightstaff/src/router/llm_router.rs @@ -8,7 +8,7 @@ use common::{ }; use hyper::header; use thiserror::Error; -use tracing::info; +use tracing::{info, warn}; use super::router_model::RouterModel; @@ -24,8 +24,8 @@ pub enum RoutingError { #[error("Failed to send request: {0}")] RequestError(#[from] reqwest::Error), - #[error("Failed to parse JSON: {0}")] - JsonError(#[from] serde_json::Error), + #[error("Failed to parse JSON: {0}, JSON: {1}")] + JsonError(serde_json::Error, String), #[error("Router model error: {0}")] RouterModelError(#[from] super::router_model::RoutingModelError), @@ -85,7 +85,7 @@ impl RouterService { info!( "router_request: {}", - &serde_json::to_string(&router_request).unwrap() + shorten_string(&serde_json::to_string(&router_request).unwrap()), ); let mut llm_route_request_headers = header::HeaderMap::new(); @@ -116,7 +116,20 @@ impl RouterService { let body = res.text().await?; - let chat_completion_response: ChatCompletionsResponse = serde_json::from_str(&body)?; + let chat_completion_response: ChatCompletionsResponse = match serde_json::from_str(&body) { + Ok(response) => response, + Err(err) => { + warn!( + "Failed to parse JSON: {}. Body: {}", + err, + &serde_json::to_string(&body).unwrap() + ); + return Err(RoutingError::JsonError( + err, + format!("Failed to parse JSON: {}", body), + )); + } + }; let selected_llm = self.router_model.parse_response( chat_completion_response.choices[0] diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 446c2bcd..10192984 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -9,10 +9,9 @@ use common::consts::{ RATELIMIT_SELECTOR_HEADER_KEY, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; use common::errors::ServerError; -use common::http::Client; use common::llm_providers::LlmProviders; use common::ratelimit::Header; -use common::stats::{Gauge, IncrementingMetric, RecordingMetric}; +use common::stats::{IncrementingMetric, RecordingMetric}; use common::tracing::{Event, Span, TraceData, Traceparent}; use common::{ratelimit, routing, tokenizer}; use http::StatusCode; @@ -20,16 +19,12 @@ use log::{debug, info, warn}; use proxy_wasm::hostcalls::get_current_time; use proxy_wasm::traits::*; use proxy_wasm::types::*; -use std::cell::RefCell; -use std::collections::{HashMap, VecDeque}; +use std::collections::VecDeque; use std::num::NonZero; use std::rc::Rc; use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; -#[derive(Debug)] -pub struct CallContext {} - pub struct StreamContext { context_id: u32, metrics: Rc, @@ -37,7 +32,7 @@ pub struct StreamContext { streaming_response: bool, response_tokens: usize, is_chat_completions_request: bool, - pub(crate) llm_providers: Rc, + llm_providers: Rc, llm_provider: Option>, request_id: Option, start_time: SystemTime, @@ -48,10 +43,6 @@ pub struct StreamContext { user_message: Option, traces_queue: Arc>>, overrides: Rc>, - pub(crate) request_body: Option, - pub(crate) request_size: Option, - pub(crate) chat_completion_request: Option, - callouts: RefCell>, } impl StreamContext { @@ -80,13 +71,8 @@ impl StreamContext { user_message: None, traces_queue, request_body_sent_time: None, - request_body: None, - request_size: None, - chat_completion_request: None, - callouts: RefCell::new(HashMap::new()), } } - fn llm_provider(&self) -> &LlmProvider { self.llm_provider .as_ref() @@ -170,7 +156,7 @@ impl StreamContext { }); } - pub fn send_server_error(&self, error: ServerError, override_status_code: Option) { + fn send_server_error(&self, error: ServerError, override_status_code: Option) { warn!("server error occurred: {}", error); self.send_http_response( override_status_code @@ -331,14 +317,6 @@ impl HttpContext for StreamContext { } }; - // remove metadata from the request body - //TODO: move this to prompt gateway - // deserialized_body.metadata = None; - // delete model key from message array - for message in deserialized_body.messages.iter_mut() { - // message.model = None; - } - self.user_message = deserialized_body .messages .iter() @@ -346,35 +324,7 @@ impl HttpContext for StreamContext { .last() .cloned(); - // let model_name = match self.llm_provider.as_ref() { - // Some(llm_provider) => llm_provider.model.as_ref(), - // None => None, - // }; - - // let use_agent_orchestrator = match self.overrides.as_ref() { - // Some(overrides) => overrides.use_agent_orchestrator.unwrap_or_default(), - // None => false, - // }; - let model_requested = deserialized_body.model.clone(); - // if deserialized_body.model.is_empty() || deserialized_body.model.to_lowercase() == "none" { - // deserialized_body.model = match model_name { - // Some(model_name) => model_name.clone(), - // None => { - // if use_agent_orchestrator { - // "agent_orchestrator".to_string() - // } else { - // self.send_server_error( - // ServerError::BadRequest { - // why: format!("No model specified in request and couldn't determine model name from arch_config. Model name in req: {}, arch_config, provider: {}, model: {:?}", deserialized_body.model, self.llm_provider().name, self.llm_provider().model).to_string(), - // }, - // Some(StatusCode::BAD_REQUEST), - // ); - // return Action::Continue; - // } - // } - // } - // } info!( "on_http_request_body: provider: {}, model requested: {}, model selected: {:?}", @@ -421,10 +371,6 @@ impl HttpContext for StreamContext { self.set_http_request_body(0, body_size, chat_completion_request_str.as_bytes()); - self.chat_completion_request = Some(deserialized_body); - self.request_body = Some(chat_completion_request_str); - self.request_size = Some(body_size); - Action::Continue } @@ -686,50 +632,4 @@ fn current_time_ns() -> u128 { .as_nanos() } -impl Context for StreamContext { - fn on_http_call_response( - &mut self, - token_id: u32, - _num_headers: usize, - body_size: usize, - _num_trailers: usize, - ) { - debug!( - "on_http_call_response [S={}] token_id={} num_headers={} body_size={} num_trailers={}", - self.context_id, token_id, _num_headers, body_size, _num_trailers - ); - - let _callout_data = self - .callouts - .borrow_mut() - .remove(&token_id) - .expect("invalid token_id"); - - let body = self - .get_http_call_response_body(0, body_size) - .unwrap_or_default(); - - info!( - "on_http_call_response: response body: {}", - String::from_utf8_lossy(&body) - ); - - self.set_http_request_body( - 0, - self.request_size.unwrap(), - self.request_body.as_ref().unwrap().as_bytes(), - ); - } -} - -impl Client for StreamContext { - type CallContext = CallContext; - - fn callouts(&self) -> &RefCell> { - &self.callouts - } - - fn active_http_calls(&self) -> &Gauge { - &self.metrics.active_http_calls - } -} +impl Context for StreamContext {} diff --git a/crates/prompt_gateway/src/stream_context.rs b/crates/prompt_gateway/src/stream_context.rs index 3f486862..0ceb4aa5 100644 --- a/crates/prompt_gateway/src/stream_context.rs +++ b/crates/prompt_gateway/src/stream_context.rs @@ -103,11 +103,7 @@ impl StreamContext { } } - pub(crate) fn send_server_error( - &self, - error: ServerError, - override_status_code: Option, - ) { + pub fn send_server_error(&self, error: ServerError, override_status_code: Option) { self.send_http_response( override_status_code .unwrap_or(StatusCode::INTERNAL_SERVER_ERROR) diff --git a/demos/use_cases/preference_based_routing/test.rest b/demos/use_cases/preference_based_routing/test_router_endpoint.rest similarity index 100% rename from demos/use_cases/preference_based_routing/test.rest rename to demos/use_cases/preference_based_routing/test_router_endpoint.rest