This commit is contained in:
Adil Hafeez 2025-06-11 13:52:59 -07:00
parent b4ab5e41b5
commit 63973a051a
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
2 changed files with 0 additions and 73 deletions

View file

@ -494,38 +494,4 @@ data: [DONE]
"Hello! How can I assist you today? Whether you have a question, need information, or just want to chat about something, I'm here to help. What would you like to talk about?"
);
}
#[test]
fn stream_chunk_parse_gemini() {
const CHUNK_RESPONSE: &str = r#"data: {"choices":[{"delta":{"content":":**\n\n* **Chief Executive:** T"#;
let iter = SseChatCompletionIter::try_from(CHUNK_RESPONSE.as_bytes());
assert!(iter.is_ok(), "Failed to create SSE iterator");
let iter: SseChatCompletionIter<str::Lines<'_>> = iter.unwrap();
let all_text: Vec<String> = iter
.map(|item| {
let response = item.expect("Failed to parse response");
response
.choices
.into_iter()
.filter_map(|choice| choice.delta.content)
.map(|content| content.to_string())
.collect::<String>()
})
.collect();
assert_eq!(
all_text.len(),
1,
"Expected 8 chunks of text, but got {}",
all_text.len()
);
assert_eq!(
all_text.join(""),
"Hello! How can I assist you today? Whether you have a question, need information, or just want to chat about something, I'm here to help. What would you like to talk about?"
);
}
}

View file

@ -570,45 +570,6 @@ impl HttpContext for StreamContext {
let hermes_llm_provider = Provider::from(llm_provider_str.as_str());
if self.streaming_response {
// check if body ends with a valid SSE event
if !body.ends_with(b"\n\n") {
if end_of_stream {
warn!("streaming response body does not end with a valid SSE event, but end of stream is true");
self.send_server_error(
ServerError::LogicError(
"streaming response body does not end with a valid SSE event"
.to_string(),
),
Some(StatusCode::BAD_REQUEST),
);
return Action::Continue;
}
// buffer the body until we have a complete SSE event
debug!("streaming response body does not end with a valid SSE event, buffering the body");
self.streaming_buffer
.get_or_insert_with(Vec::new)
.extend_from_slice(&body);
// we need to wait for the next chunk to complete the SSE event
return Action::Pause;
}
// if streaming_buffer is Some, it means we have buffered data from previous chunks
// otherwise we can process the body directly
// let sse_event_buffer = match self.streaming_buffer.take() {
// Some(buffer) => {
// debug!("streaming response body has buffered data, prepending it to the current chunk");
// let mut complete_body = buffer;
// complete_body.extend_from_slice(&body);
// complete_body
// }
// None => {
// debug!("no buffered data, processing the current chunk directly");
// body
// }
// };
let chat_completions_chunk_response_events =
match SseChatCompletionIter::try_from((body.as_slice(), &hermes_llm_provider)) {
Ok(events) => events,