address pr feedback

This commit is contained in:
Adil Hafeez 2024-11-12 13:05:03 -08:00
parent 3b8a19efe9
commit ee2751be68
8 changed files with 836 additions and 383 deletions

View file

@ -18,8 +18,7 @@ pub struct WasmMetrics {
pub active_http_calls: Gauge,
pub ratelimited_rq: Counter,
pub time_to_first_token: Histogram,
pub time_per_output_token: Histogram,
pub latency: Histogram,
pub request_latency: Histogram,
pub output_sequence_length: Histogram,
pub input_sequence_length: Histogram,
}
@ -30,8 +29,7 @@ impl WasmMetrics {
active_http_calls: Gauge::new(String::from("active_http_calls")),
ratelimited_rq: Counter::new(String::from("ratelimited_rq")),
time_to_first_token: Histogram::new(String::from("time_to_first_token")),
time_per_output_token: Histogram::new(String::from("time_per_output_token")),
latency: Histogram::new(String::from("latency")),
request_latency: Histogram::new(String::from("request_latency")),
output_sequence_length: Histogram::new(String::from("output_sequence_length")),
input_sequence_length: Histogram::new(String::from("input_sequence_length")),
}

View file

@ -36,10 +36,7 @@ pub struct StreamContext {
llm_provider: Option<Rc<LlmProvider>>,
request_id: Option<String>,
start_time: Option<SystemTime>,
ttft_recorded: bool,
ttft_duration: Option<Duration>, // Store the duration directly
first_token_processed: bool,
last_token_time: Option<SystemTime>,
}
impl StreamContext {
@ -55,10 +52,7 @@ impl StreamContext {
llm_provider: None,
request_id: None,
start_time: None,
ttft_recorded: false,
ttft_duration: None,
first_token_processed: false,
last_token_time: None,
}
}
fn llm_provider(&self) -> &LlmProvider {
@ -144,24 +138,12 @@ impl StreamContext {
// Check if rate limiting needs to be applied.
if let Some(selector) = self.ratelimit_selector.take() {
log::debug!("Rate limiting applied for model: {}", model);
let result = ratelimit::ratelimits(None).read().unwrap().check_limit(
log::debug!("Applying ratelimit for model: {}", model);
ratelimit::ratelimits(None).read().unwrap().check_limit(
model.to_owned(),
selector,
NonZero::new(token_count as u32).unwrap(),
);
match result {
Ok(_) => log::debug!("Rate limit check passed for model: {}", model),
Err(e) => {
log::debug!(
"Rate limit check failed for model: {} with error: {:?}",
model,
e
);
return Err(e);
}
}
)?;
} else {
log::debug!("No rate limit applied for model: {}", model);
}
@ -196,17 +178,9 @@ impl HttpContext for StreamContext {
self.request_id = self.get_http_request_header(REQUEST_ID_HEADER);
//start the timing for the request using get_current_time()
match get_current_time() {
Ok(current_time) => {
self.start_time = Some(current_time);
self.ttft_recorded = false;
self.ttft_duration = None;
}
Err(e) => {
warn!("Failed to get current time: {:?}", e);
self.start_time = None;
}
}
let current_time = get_current_time().unwrap();
self.start_time = Some(current_time);
self.ttft_duration = None;
Action::Continue
}
@ -309,21 +283,17 @@ impl HttpContext for StreamContext {
// All streaming responses end with bytes=0 and end_stream=true
// Record the latency for the request
if let Some(start_time) = self.start_time {
match get_current_time() {
Ok(current_time) => match current_time.duration_since(start_time) {
Ok(duration) => {
// Convert the duration to milliseconds
let duration_ms = duration.as_millis();
debug!("Total latency: {} milliseconds", duration_ms);
// Record the latency to the latency histogram
self.metrics.latency.record(duration_ms as u64);
}
Err(e) => {
warn!("SystemTime error: {:?}", e);
}
},
let current_time = get_current_time().unwrap();
match current_time.duration_since(start_time) {
Ok(duration) => {
// Convert the duration to milliseconds
let duration_ms = duration.as_millis();
debug!("Total latency: {} milliseconds", duration_ms);
// Record the latency to the latency histogram
self.metrics.request_latency.record(duration_ms as u64);
}
Err(e) => {
warn!("Failed to get current time: {:?}", e);
warn!("SystemTime error: {:?}", e);
}
}
}
@ -422,73 +392,24 @@ impl HttpContext for StreamContext {
self.response_tokens += token_count;
// Compute TTFT if not already recorded
if !self.ttft_recorded {
if self.ttft_duration.is_none() {
if let Some(start_time) = self.start_time {
match get_current_time() {
Ok(current_time) => match current_time.duration_since(start_time) {
Ok(duration) => {
let duration_ms = duration.as_millis();
debug!("Time to First Token (TTFT): {} milliseconds", duration_ms);
self.ttft_duration = Some(duration);
self.metrics.time_to_first_token.record(duration_ms as u64);
}
Err(e) => {
warn!("SystemTime error: {:?}", e);
}
},
let current_time = get_current_time().unwrap();
match current_time.duration_since(start_time) {
Ok(duration) => {
let duration_ms = duration.as_millis();
debug!("Time to First Token (TTFT): {} milliseconds", duration_ms);
self.ttft_duration = Some(duration);
self.metrics.time_to_first_token.record(duration_ms as u64);
}
Err(e) => {
warn!("Failed to get current time: {:?}", e);
warn!("SystemTime error: {:?}", e);
}
}
self.ttft_recorded = true;
} else {
warn!("Start time was not recorded");
}
}
// Check if first token was not processed yet, and if there are tokens in the response.
// If so, set the last_token_time to now and set first_token_processed to true
if !self.first_token_processed && token_count > 0 {
self.first_token_processed = true;
// Set last_token_time to now
match get_current_time() {
Ok(current_time) => {
self.last_token_time = Some(current_time);
}
Err(e) => {
warn!("Failed to get current time: {:?}", e);
}
}
} else if self.first_token_processed && token_count > 0 {
if let Some(last_token_time) = self.last_token_time {
match get_current_time() {
Ok(current_time) => {
// record the time for the current output token and calculate the time per output token
match current_time.duration_since(last_token_time) {
Ok(duration) => {
// Convert the duration to milliseconds
let duration_ms = duration.as_millis();
debug!(
"Time for Current Output Token: {} milliseconds",
duration_ms as u64 / token_count as u64
);
// Record TPOT metric for historgram
self.metrics
.time_per_output_token
.record((duration_ms as u64) / (token_count as u64));
}
Err(e) => {
warn!("SystemTime error: {:?}", e);
}
}
// Set last_token_time to now
self.last_token_time = Some(current_time);
}
Err(e) => {
warn!("Failed to get current time: {:?}", e);
}
}
}
}
} else {
debug!("non streaming response");
let chat_completions_response: ChatCompletionsResponse =
@ -507,31 +428,6 @@ impl HttpContext for StreamContext {
.unwrap()
.completion_tokens;
}
// // Compute TFT if not already recorded
// if !self.ttft_recorded {
// if let Some(start_time) = self.start_time {
// match get_current_time() {
// Ok(current_time) => match current_time.duration_since(start_time) {
// Ok(duration) => {
// let duration_ms = duration.as_millis();
// debug!("Time to First Token (TFT): {} milliseconds", duration_ms);
// self.ttft_duration = Some(duration);
// self.metrics.time_to_first_token.record(duration_ms as u64);
// }
// Err(e) => {
// warn!("SystemTime error: {:?}", e);
// }
// },
// Err(e) => {
// warn!("Failed to get current time: {:?}", e);
// }
// }
// self.ttft_recorded = true;
// } else {
// warn!("Start time was not recorded");
// }
// }
}
debug!(