Resetting instead of resuming (#39)

Signed-off-by: José Ulises Niño Rivera <junr03@users.noreply.github.com>
This commit is contained in:
José Ulises Niño Rivera 2024-09-14 10:58:25 -07:00 committed by GitHub
parent 33a1a68218
commit 8565462ec4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -85,12 +85,20 @@ impl StreamContext {
});
}
fn send_server_error(&mut self, error: String) {
debug!("server error occurred: {}", error);
self.send_http_response(
StatusCode::INTERNAL_SERVER_ERROR.as_u16().into(),
vec![],
Some(error.as_bytes()),
)
}
fn embeddings_handler(&mut self, body: Vec<u8>, mut callout_context: CallContext) {
let embedding_response: CreateEmbeddingResponse = match serde_json::from_slice(&body) {
Ok(embedding_response) => embedding_response,
Err(e) => {
warn!("Error deserializing embedding response: {:?}", e);
self.resume_http_request();
self.send_server_error(format!("Error deserializing embedding response: {:?}", e));
return;
}
};
@ -104,8 +112,7 @@ impl StreamContext {
let json_data: String = match serde_json::to_string(&search_points_request) {
Ok(json_data) => json_data,
Err(e) => {
warn!("Error serializing search_points_request: {:?}", e);
self.reset_http_request();
self.send_server_error(format!("Error serializing search_points_request: {:?}", e));
return;
}
};
@ -142,8 +149,11 @@ impl StreamContext {
let search_points_response: SearchPointsResponse = match serde_json::from_slice(&body) {
Ok(search_points_response) => search_points_response,
Err(e) => {
warn!("Error deserializing search_points_response: {:?}", e);
self.resume_http_request();
self.send_server_error(format!(
"Error deserializing search_points_response: {:?}",
e
));
return;
}
};
@ -186,8 +196,7 @@ impl StreamContext {
{
Ok(prompt_target) => prompt_target,
Err(e) => {
warn!("Error deserializing prompt_target: {:?}", e);
self.resume_http_request();
self.send_server_error(format!("Error deserializing prompt_target: {:?}", e));
return;
}
};
@ -238,8 +247,10 @@ impl StreamContext {
msg_body
}
Err(e) => {
warn!("Error serializing request_params: {:?}", e);
self.resume_http_request();
self.send_server_error(format!(
"Error serializing request_params: {:?}",
e
));
return;
}
};
@ -424,8 +435,7 @@ impl StreamContext {
let json_string = match serde_json::to_string(&request_message) {
Ok(json_string) => json_string,
Err(e) => {
warn!("Error serializing request_body: {:?}", e);
self.resume_http_request();
self.send_server_error(format!("Error serializing request_body: {:?}", e));
return;
}
};
@ -600,30 +610,19 @@ impl Context for StreamContext {
let callout_context = self.callouts.remove(&token_id).expect("invalid token_id");
self.metrics.active_http_calls.increment(-1);
let resp = self.get_http_call_response_body(0, body_size);
if resp.is_none() {
warn!("No response body");
self.resume_http_request();
return;
}
let body = match resp {
Some(body) => body,
None => {
warn!("Empty response body");
self.resume_http_request();
return;
}
};
match callout_context.request_type {
RequestType::GetEmbedding => self.embeddings_handler(body, callout_context),
RequestType::SearchPoints => self.search_points_handler(body, callout_context),
RequestType::FunctionResolver => self.function_resolver_handler(body, callout_context),
RequestType::FunctionCallResponse => {
self.function_call_response_handler(body, callout_context)
if let Some(body) = self.get_http_call_response_body(0, body_size) {
match callout_context.request_type {
RequestType::GetEmbedding => self.embeddings_handler(body, callout_context),
RequestType::SearchPoints => self.search_points_handler(body, callout_context),
RequestType::FunctionResolver => {
self.function_resolver_handler(body, callout_context)
}
RequestType::FunctionCallResponse => {
self.function_call_response_handler(body, callout_context)
}
}
} else {
warn!("No response body in inline HTTP request");
}
}
}