remove dead code

This commit is contained in:
Adil Hafeez 2025-05-14 15:33:09 -07:00
parent fa2f85573e
commit f60cac27f4
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
4 changed files with 24 additions and 115 deletions

View file

@ -8,7 +8,7 @@ use common::{
};
use hyper::header;
use thiserror::Error;
use tracing::info;
use tracing::{info, warn};
use super::router_model::RouterModel;
@ -24,8 +24,8 @@ pub enum RoutingError {
#[error("Failed to send request: {0}")]
RequestError(#[from] reqwest::Error),
#[error("Failed to parse JSON: {0}")]
JsonError(#[from] serde_json::Error),
#[error("Failed to parse JSON: {0}, JSON: {1}")]
JsonError(serde_json::Error, String),
#[error("Router model error: {0}")]
RouterModelError(#[from] super::router_model::RoutingModelError),
@ -85,7 +85,7 @@ impl RouterService {
info!(
"router_request: {}",
&serde_json::to_string(&router_request).unwrap()
shorten_string(&serde_json::to_string(&router_request).unwrap()),
);
let mut llm_route_request_headers = header::HeaderMap::new();
@ -116,7 +116,20 @@ impl RouterService {
let body = res.text().await?;
let chat_completion_response: ChatCompletionsResponse = serde_json::from_str(&body)?;
let chat_completion_response: ChatCompletionsResponse = match serde_json::from_str(&body) {
Ok(response) => response,
Err(err) => {
warn!(
"Failed to parse JSON: {}. Body: {}",
err,
&serde_json::to_string(&body).unwrap()
);
return Err(RoutingError::JsonError(
err,
format!("Failed to parse JSON: {}", body),
));
}
};
let selected_llm = self.router_model.parse_response(
chat_completion_response.choices[0]

View file

@ -9,10 +9,9 @@ use common::consts::{
RATELIMIT_SELECTOR_HEADER_KEY, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
use common::errors::ServerError;
use common::http::Client;
use common::llm_providers::LlmProviders;
use common::ratelimit::Header;
use common::stats::{Gauge, IncrementingMetric, RecordingMetric};
use common::stats::{IncrementingMetric, RecordingMetric};
use common::tracing::{Event, Span, TraceData, Traceparent};
use common::{ratelimit, routing, tokenizer};
use http::StatusCode;
@ -20,16 +19,12 @@ use log::{debug, info, warn};
use proxy_wasm::hostcalls::get_current_time;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
use std::cell::RefCell;
use std::collections::{HashMap, VecDeque};
use std::collections::VecDeque;
use std::num::NonZero;
use std::rc::Rc;
use std::sync::{Arc, Mutex};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug)]
pub struct CallContext {}
pub struct StreamContext {
context_id: u32,
metrics: Rc<Metrics>,
@ -37,7 +32,7 @@ pub struct StreamContext {
streaming_response: bool,
response_tokens: usize,
is_chat_completions_request: bool,
pub(crate) llm_providers: Rc<LlmProviders>,
llm_providers: Rc<LlmProviders>,
llm_provider: Option<Rc<LlmProvider>>,
request_id: Option<String>,
start_time: SystemTime,
@ -48,10 +43,6 @@ pub struct StreamContext {
user_message: Option<Message>,
traces_queue: Arc<Mutex<VecDeque<TraceData>>>,
overrides: Rc<Option<Overrides>>,
pub(crate) request_body: Option<String>,
pub(crate) request_size: Option<usize>,
pub(crate) chat_completion_request: Option<ChatCompletionsRequest>,
callouts: RefCell<HashMap<u32, CallContext>>,
}
impl StreamContext {
@ -80,13 +71,8 @@ impl StreamContext {
user_message: None,
traces_queue,
request_body_sent_time: None,
request_body: None,
request_size: None,
chat_completion_request: None,
callouts: RefCell::new(HashMap::new()),
}
}
fn llm_provider(&self) -> &LlmProvider {
self.llm_provider
.as_ref()
@ -170,7 +156,7 @@ impl StreamContext {
});
}
pub fn send_server_error(&self, error: ServerError, override_status_code: Option<StatusCode>) {
fn send_server_error(&self, error: ServerError, override_status_code: Option<StatusCode>) {
warn!("server error occurred: {}", error);
self.send_http_response(
override_status_code
@ -331,14 +317,6 @@ impl HttpContext for StreamContext {
}
};
// remove metadata from the request body
//TODO: move this to prompt gateway
// deserialized_body.metadata = None;
// delete model key from message array
for message in deserialized_body.messages.iter_mut() {
// message.model = None;
}
self.user_message = deserialized_body
.messages
.iter()
@ -346,35 +324,7 @@ impl HttpContext for StreamContext {
.last()
.cloned();
// let model_name = match self.llm_provider.as_ref() {
// Some(llm_provider) => llm_provider.model.as_ref(),
// None => None,
// };
// let use_agent_orchestrator = match self.overrides.as_ref() {
// Some(overrides) => overrides.use_agent_orchestrator.unwrap_or_default(),
// None => false,
// };
let model_requested = deserialized_body.model.clone();
// if deserialized_body.model.is_empty() || deserialized_body.model.to_lowercase() == "none" {
// deserialized_body.model = match model_name {
// Some(model_name) => model_name.clone(),
// None => {
// if use_agent_orchestrator {
// "agent_orchestrator".to_string()
// } else {
// self.send_server_error(
// ServerError::BadRequest {
// why: format!("No model specified in request and couldn't determine model name from arch_config. Model name in req: {}, arch_config, provider: {}, model: {:?}", deserialized_body.model, self.llm_provider().name, self.llm_provider().model).to_string(),
// },
// Some(StatusCode::BAD_REQUEST),
// );
// return Action::Continue;
// }
// }
// }
// }
info!(
"on_http_request_body: provider: {}, model requested: {}, model selected: {:?}",
@ -421,10 +371,6 @@ impl HttpContext for StreamContext {
self.set_http_request_body(0, body_size, chat_completion_request_str.as_bytes());
self.chat_completion_request = Some(deserialized_body);
self.request_body = Some(chat_completion_request_str);
self.request_size = Some(body_size);
Action::Continue
}
@ -686,50 +632,4 @@ fn current_time_ns() -> u128 {
.as_nanos()
}
impl Context for StreamContext {
fn on_http_call_response(
&mut self,
token_id: u32,
_num_headers: usize,
body_size: usize,
_num_trailers: usize,
) {
debug!(
"on_http_call_response [S={}] token_id={} num_headers={} body_size={} num_trailers={}",
self.context_id, token_id, _num_headers, body_size, _num_trailers
);
let _callout_data = self
.callouts
.borrow_mut()
.remove(&token_id)
.expect("invalid token_id");
let body = self
.get_http_call_response_body(0, body_size)
.unwrap_or_default();
info!(
"on_http_call_response: response body: {}",
String::from_utf8_lossy(&body)
);
self.set_http_request_body(
0,
self.request_size.unwrap(),
self.request_body.as_ref().unwrap().as_bytes(),
);
}
}
impl Client for StreamContext {
type CallContext = CallContext;
fn callouts(&self) -> &RefCell<HashMap<u32, Self::CallContext>> {
&self.callouts
}
fn active_http_calls(&self) -> &Gauge {
&self.metrics.active_http_calls
}
}
impl Context for StreamContext {}

View file

@ -103,11 +103,7 @@ impl StreamContext {
}
}
pub(crate) fn send_server_error(
&self,
error: ServerError,
override_status_code: Option<StatusCode>,
) {
pub fn send_server_error(&self, error: ServerError, override_status_code: Option<StatusCode>) {
self.send_http_response(
override_status_code
.unwrap_or(StatusCode::INTERNAL_SERVER_ERROR)