diff --git a/crates/hermesllm/src/providers/openai/types.rs b/crates/hermesllm/src/providers/openai/types.rs index 170eec42..6f4e38d7 100644 --- a/crates/hermesllm/src/providers/openai/types.rs +++ b/crates/hermesllm/src/providers/openai/types.rs @@ -494,38 +494,4 @@ data: [DONE] "Hello! How can I assist you today? Whether you have a question, need information, or just want to chat about something, I'm here to help. What would you like to talk about?" ); } - - #[test] - fn stream_chunk_parse_gemini() { - const CHUNK_RESPONSE: &str = r#"data: {"choices":[{"delta":{"content":":**\n\n* **Chief Executive:** T"#; - - let iter = SseChatCompletionIter::try_from(CHUNK_RESPONSE.as_bytes()); - - assert!(iter.is_ok(), "Failed to create SSE iterator"); - let iter: SseChatCompletionIter> = iter.unwrap(); - - let all_text: Vec = iter - .map(|item| { - let response = item.expect("Failed to parse response"); - response - .choices - .into_iter() - .filter_map(|choice| choice.delta.content) - .map(|content| content.to_string()) - .collect::() - }) - .collect(); - - assert_eq!( - all_text.len(), - 1, - "Expected 8 chunks of text, but got {}", - all_text.len() - ); - - assert_eq!( - all_text.join(""), - "Hello! How can I assist you today? Whether you have a question, need information, or just want to chat about something, I'm here to help. What would you like to talk about?" - ); - } } diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index d2305ed0..e104cb39 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -570,45 +570,6 @@ impl HttpContext for StreamContext { let hermes_llm_provider = Provider::from(llm_provider_str.as_str()); if self.streaming_response { - // check if body ends with a valid SSE event - if !body.ends_with(b"\n\n") { - if end_of_stream { - warn!("streaming response body does not end with a valid SSE event, but end of stream is true"); - self.send_server_error( - ServerError::LogicError( - "streaming response body does not end with a valid SSE event" - .to_string(), - ), - Some(StatusCode::BAD_REQUEST), - ); - return Action::Continue; - } - - // buffer the body until we have a complete SSE event - debug!("streaming response body does not end with a valid SSE event, buffering the body"); - self.streaming_buffer - .get_or_insert_with(Vec::new) - .extend_from_slice(&body); - // we need to wait for the next chunk to complete the SSE event - return Action::Pause; - } - - // if streaming_buffer is Some, it means we have buffered data from previous chunks - // otherwise we can process the body directly - - // let sse_event_buffer = match self.streaming_buffer.take() { - // Some(buffer) => { - // debug!("streaming response body has buffered data, prepending it to the current chunk"); - // let mut complete_body = buffer; - // complete_body.extend_from_slice(&body); - // complete_body - // } - // None => { - // debug!("no buffered data, processing the current chunk directly"); - // body - // } - // }; - let chat_completions_chunk_response_events = match SseChatCompletionIter::try_from((body.as_slice(), &hermes_llm_provider)) { Ok(events) => events,