mirror of
https://github.com/katanemo/plano.git
synced 2026-05-14 10:32:41 +02:00
Add support for streaming and fixes few issues (see description) (#202)
This commit is contained in:
parent
29ff8da60f
commit
662a840ac5
45 changed files with 2266 additions and 477 deletions
|
|
@ -3,14 +3,14 @@ use std::{collections::HashMap, time::Duration};
|
|||
use common::{
|
||||
common_types::{
|
||||
open_ai::{
|
||||
ArchState, ChatCompletionsRequest, ChatCompletionsResponse, Message, StreamOptions,
|
||||
to_server_events, ArchState, ChatCompletionStreamResponse, ChatCompletionsRequest,
|
||||
},
|
||||
PromptGuardRequest, PromptGuardTask,
|
||||
},
|
||||
consts::{
|
||||
ARCH_FC_MODEL_NAME, ARCH_INTERNAL_CLUSTER_NAME, ARCH_STATE_HEADER,
|
||||
ARCH_UPSTREAM_HOST_HEADER, ASSISTANT_ROLE, CHAT_COMPLETIONS_PATH, GUARD_INTERNAL_HOST,
|
||||
REQUEST_ID_HEADER, TOOL_ROLE, USER_ROLE,
|
||||
HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, USER_ROLE,
|
||||
},
|
||||
errors::ServerError,
|
||||
http::{CallArgs, Client},
|
||||
|
|
@ -33,8 +33,17 @@ impl HttpContext for StreamContext {
|
|||
// manipulate the body in benign ways e.g., compression.
|
||||
self.set_http_request_header("content-length", None);
|
||||
|
||||
self.is_chat_completions_request =
|
||||
self.get_http_request_header(":path").unwrap_or_default() == CHAT_COMPLETIONS_PATH;
|
||||
let request_path = self.get_http_request_header(":path").unwrap_or_default();
|
||||
if request_path == HEALTHZ_PATH {
|
||||
if self.embeddings_store.is_none() {
|
||||
self.send_http_response(503, vec![], None);
|
||||
} else {
|
||||
self.send_http_response(200, vec![], None);
|
||||
}
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
self.is_chat_completions_request = request_path == CHAT_COMPLETIONS_PATH;
|
||||
|
||||
trace!(
|
||||
"on_http_request_headers S[{}] req_headers={:?}",
|
||||
|
|
@ -80,21 +89,23 @@ impl HttpContext for StreamContext {
|
|||
}
|
||||
};
|
||||
|
||||
debug!("developer => archgw: {}", String::from_utf8_lossy(&body_bytes));
|
||||
debug!(
|
||||
"developer => archgw: {}",
|
||||
String::from_utf8_lossy(&body_bytes)
|
||||
);
|
||||
|
||||
// Deserialize body into spec.
|
||||
// Currently OpenAI API.
|
||||
let mut deserialized_body: ChatCompletionsRequest =
|
||||
match serde_json::from_slice(&body_bytes) {
|
||||
Ok(deserialized) => deserialized,
|
||||
Err(e) => {
|
||||
self.send_server_error(
|
||||
ServerError::Deserialization(e),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Pause;
|
||||
}
|
||||
};
|
||||
let deserialized_body: ChatCompletionsRequest = match serde_json::from_slice(&body_bytes) {
|
||||
Ok(deserialized) => deserialized,
|
||||
Err(e) => {
|
||||
self.send_server_error(
|
||||
ServerError::Deserialization(e),
|
||||
Some(StatusCode::BAD_REQUEST),
|
||||
);
|
||||
return Action::Pause;
|
||||
}
|
||||
};
|
||||
|
||||
self.arch_state = match deserialized_body.metadata {
|
||||
Some(ref metadata) => {
|
||||
|
|
@ -110,11 +121,6 @@ impl HttpContext for StreamContext {
|
|||
};
|
||||
|
||||
self.streaming_response = deserialized_body.stream;
|
||||
if deserialized_body.stream && deserialized_body.stream_options.is_none() {
|
||||
deserialized_body.stream_options = Some(StreamOptions {
|
||||
include_usage: true,
|
||||
});
|
||||
}
|
||||
|
||||
let last_user_prompt = match deserialized_body
|
||||
.messages
|
||||
|
|
@ -235,105 +241,111 @@ impl HttpContext for StreamContext {
|
|||
);
|
||||
|
||||
if !self.is_chat_completions_request {
|
||||
if let Some(body_str) = self
|
||||
.get_http_response_body(0, body_size)
|
||||
.and_then(|bytes| String::from_utf8(bytes).ok())
|
||||
{
|
||||
debug!("recv [S={}] body_str={}", self.context_id, body_str);
|
||||
}
|
||||
debug!("non-streaming request");
|
||||
return Action::Continue;
|
||||
}
|
||||
|
||||
if !end_of_stream {
|
||||
return Action::Pause;
|
||||
}
|
||||
let body = if self.streaming_response {
|
||||
let streaming_chunk = match self.get_http_response_body(0, body_size) {
|
||||
Some(chunk) => chunk,
|
||||
None => {
|
||||
warn!(
|
||||
"response body empty, chunk_start: {}, chunk_size: {}",
|
||||
0, body_size
|
||||
);
|
||||
return Action::Continue;
|
||||
}
|
||||
};
|
||||
|
||||
let body = self
|
||||
.get_http_response_body(0, body_size)
|
||||
.expect("cant get response body");
|
||||
if streaming_chunk.len() != body_size {
|
||||
warn!(
|
||||
"chunk size mismatch: read: {} != requested: {}",
|
||||
streaming_chunk.len(),
|
||||
body_size
|
||||
);
|
||||
}
|
||||
|
||||
streaming_chunk
|
||||
} else {
|
||||
debug!("non streaming response bytes read: 0:{}", body_size);
|
||||
match self.get_http_response_body(0, body_size) {
|
||||
Some(body) => body,
|
||||
None => {
|
||||
warn!("non streaming response body empty");
|
||||
return Action::Continue;
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let body_utf8 = match String::from_utf8(body) {
|
||||
Ok(body_utf8) => body_utf8,
|
||||
Err(e) => {
|
||||
debug!("could not convert to utf8: {}", e);
|
||||
return Action::Continue;
|
||||
}
|
||||
};
|
||||
|
||||
if self.streaming_response {
|
||||
trace!("streaming response");
|
||||
} else {
|
||||
trace!("non streaming response");
|
||||
let chat_completions_response: ChatCompletionsResponse =
|
||||
match serde_json::from_slice(&body) {
|
||||
Ok(de) => de,
|
||||
Err(e) => {
|
||||
trace!(
|
||||
"invalid response: {}, {}",
|
||||
String::from_utf8_lossy(&body),
|
||||
e
|
||||
);
|
||||
return Action::Continue;
|
||||
}
|
||||
};
|
||||
|
||||
if chat_completions_response.usage.is_some() {
|
||||
self.response_tokens += chat_completions_response
|
||||
.usage
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.completion_tokens;
|
||||
if self.tool_calls.is_some() && !self.tool_calls.as_ref().unwrap().is_empty() {
|
||||
let chunks = vec![
|
||||
ChatCompletionStreamResponse::new(
|
||||
None,
|
||||
Some(ASSISTANT_ROLE.to_string()),
|
||||
Some(ARCH_FC_MODEL_NAME.to_string()),
|
||||
self.tool_calls.to_owned(),
|
||||
),
|
||||
ChatCompletionStreamResponse::new(
|
||||
self.tool_call_response.clone(),
|
||||
Some(TOOL_ROLE.to_string()),
|
||||
Some(ARCH_FC_MODEL_NAME.to_string()),
|
||||
None,
|
||||
),
|
||||
];
|
||||
|
||||
let mut response_str = to_server_events(chunks);
|
||||
// append the original response from the model to the stream
|
||||
response_str.push_str(&body_utf8);
|
||||
self.set_http_response_body(0, body_size, response_str.as_bytes());
|
||||
self.tool_calls = None;
|
||||
}
|
||||
} else if let Some(tool_calls) = self.tool_calls.as_ref() {
|
||||
if !tool_calls.is_empty() {
|
||||
if self.arch_state.is_none() {
|
||||
self.arch_state = Some(Vec::new());
|
||||
}
|
||||
|
||||
if let Some(tool_calls) = self.tool_calls.as_ref() {
|
||||
if !tool_calls.is_empty() {
|
||||
if self.arch_state.is_none() {
|
||||
self.arch_state = Some(Vec::new());
|
||||
let mut data = serde_json::from_str(&body_utf8).unwrap();
|
||||
// use serde::Value to manipulate the json object and ensure that we don't lose any data
|
||||
if let Value::Object(ref mut map) = data {
|
||||
// serialize arch state and add to metadata
|
||||
let metadata = map
|
||||
.entry("metadata")
|
||||
.or_insert(Value::Object(serde_json::Map::new()));
|
||||
if metadata == &Value::Null {
|
||||
*metadata = Value::Object(serde_json::Map::new());
|
||||
}
|
||||
|
||||
let mut data = serde_json::from_slice(&body).unwrap();
|
||||
// use serde::Value to manipulate the json object and ensure that we don't lose any data
|
||||
if let Value::Object(ref mut map) = data {
|
||||
// serialize arch state and add to metadata
|
||||
let metadata = map
|
||||
.entry("metadata")
|
||||
.or_insert(Value::Object(serde_json::Map::new()));
|
||||
if metadata == &Value::Null {
|
||||
*metadata = Value::Object(serde_json::Map::new());
|
||||
}
|
||||
|
||||
// since arch gateway generates tool calls (using arch-fc) and calls upstream api to
|
||||
// get response, we will send these back to developer so they can see the api response
|
||||
// and tool call arch-fc generated
|
||||
let fc_messages = vec![
|
||||
Message {
|
||||
role: ASSISTANT_ROLE.to_string(),
|
||||
content: None,
|
||||
model: Some(ARCH_FC_MODEL_NAME.to_string()),
|
||||
tool_calls: self.tool_calls.clone(),
|
||||
tool_call_id: None,
|
||||
},
|
||||
Message {
|
||||
role: TOOL_ROLE.to_string(),
|
||||
content: self.tool_call_response.clone(),
|
||||
model: None,
|
||||
tool_calls: None,
|
||||
tool_call_id: Some(self.tool_calls.as_ref().unwrap()[0].id.clone()),
|
||||
},
|
||||
];
|
||||
let fc_messages_str = serde_json::to_string(&fc_messages).unwrap();
|
||||
let arch_state = HashMap::from([("messages".to_string(), fc_messages_str)]);
|
||||
let arch_state_str = serde_json::to_string(&arch_state).unwrap();
|
||||
metadata.as_object_mut().unwrap().insert(
|
||||
ARCH_STATE_HEADER.to_string(),
|
||||
serde_json::Value::String(arch_state_str),
|
||||
);
|
||||
let data_serialized = serde_json::to_string(&data).unwrap();
|
||||
debug!("archgw <= developer: {}", data_serialized);
|
||||
self.set_http_response_body(0, body_size, data_serialized.as_bytes());
|
||||
};
|
||||
}
|
||||
let fc_messages = vec![
|
||||
self.generate_toll_call_message(),
|
||||
self.generate_api_response_message(),
|
||||
];
|
||||
let fc_messages_str = serde_json::to_string(&fc_messages).unwrap();
|
||||
let arch_state = HashMap::from([("messages".to_string(), fc_messages_str)]);
|
||||
let arch_state_str = serde_json::to_string(&arch_state).unwrap();
|
||||
metadata.as_object_mut().unwrap().insert(
|
||||
ARCH_STATE_HEADER.to_string(),
|
||||
serde_json::Value::String(arch_state_str),
|
||||
);
|
||||
let data_serialized = serde_json::to_string(&data).unwrap();
|
||||
debug!("archgw <= developer: {}", data_serialized);
|
||||
self.set_http_response_body(0, body_size, data_serialized.as_bytes());
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
trace!(
|
||||
"recv [S={}] total_tokens={} end_stream={}",
|
||||
self.context_id,
|
||||
self.response_tokens,
|
||||
end_of_stream
|
||||
);
|
||||
trace!("recv [S={}] end_stream={}", self.context_id, end_of_stream);
|
||||
|
||||
Action::Continue
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue