use streaming to respond

This commit is contained in:
Adil Hafeez 2025-05-30 02:04:38 -07:00
parent e3ef0ea6b1
commit c99d64d995
No known key found for this signature in database
GPG key ID: 9B18EF7691369645

View file

@ -116,62 +116,40 @@ pub async fn chat_completions(
headers.insert(header_name, header_value.clone()); headers.insert(header_name, header_value.clone());
} }
if chat_completion_request.stream { // channel to create async stream
// channel to create async stream let (tx, rx) = mpsc::channel::<Bytes>(16);
let (tx, rx) = mpsc::channel::<Bytes>(16);
// Spawn a task to send data as it becomes available // Spawn a task to send data as it becomes available
tokio::spawn(async move { tokio::spawn(async move {
let mut byte_stream = llm_response.bytes_stream(); let mut byte_stream = llm_response.bytes_stream();
while let Some(item) = byte_stream.next().await { while let Some(item) = byte_stream.next().await {
let item = match item { let item = match item {
Ok(item) => item, Ok(item) => item,
Err(err) => { Err(err) => {
warn!("Error receiving chunk: {:?}", err); warn!("Error receiving chunk: {:?}", err);
break;
}
};
if tx.send(item).await.is_err() {
warn!("Receiver dropped");
break; break;
} }
} };
});
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk))); if tx.send(item).await.is_err() {
warn!("Receiver dropped");
let stream_body = BoxBody::new(StreamBody::new(stream)); break;
match response.body(stream_body) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
} }
} }
} else { });
let body_raw = match llm_response.bytes().await {
Ok(body) => body,
Err(err) => {
let err_msg = format!("Failed to read response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return Ok(internal_error);
}
};
match response.body(full(body_raw)) { let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
Ok(response) => Ok(response),
Err(err) => { let stream_body = BoxBody::new(StreamBody::new(stream));
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg)); match response.body(stream_body) {
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; Ok(response) => Ok(response),
Ok(internal_error) Err(err) => {
} let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
} }
} }
} }