mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
cargo fmt
This commit is contained in:
parent
a2dc311e7d
commit
54897e5a56
4 changed files with 40 additions and 35 deletions
|
|
@ -124,7 +124,11 @@ pub async fn llm_chat(
|
|||
// Only process state if state_storage is configured
|
||||
let mut should_manage_state = false;
|
||||
if is_responses_api_client {
|
||||
if let (ProviderRequestType::ResponsesAPIRequest(ref mut responses_req), Some(ref state_store)) = (&mut client_request, &state_storage) {
|
||||
if let (
|
||||
ProviderRequestType::ResponsesAPIRequest(ref mut responses_req),
|
||||
Some(ref state_store),
|
||||
) = (&mut client_request, &state_storage)
|
||||
{
|
||||
// Extract original input once
|
||||
original_input_items = extract_input_items(&responses_req.input);
|
||||
|
||||
|
|
@ -292,31 +296,34 @@ pub async fn llm_chat(
|
|||
|
||||
// === v1/responses state management: Wrap with ResponsesStateProcessor ===
|
||||
// Only wrap if we need to manage state (client is ResponsesAPI AND upstream is NOT ResponsesAPI AND state_storage is configured)
|
||||
let streaming_response =
|
||||
if let (true, false, Some(state_store)) = (should_manage_state, original_input_items.is_empty(), state_storage) {
|
||||
// Extract Content-Encoding header to handle decompression for state parsing
|
||||
let content_encoding = response_headers
|
||||
.get("content-encoding")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
let streaming_response = if let (true, false, Some(state_store)) = (
|
||||
should_manage_state,
|
||||
original_input_items.is_empty(),
|
||||
state_storage,
|
||||
) {
|
||||
// Extract Content-Encoding header to handle decompression for state parsing
|
||||
let content_encoding = response_headers
|
||||
.get("content-encoding")
|
||||
.and_then(|v| v.to_str().ok())
|
||||
.map(|s| s.to_string());
|
||||
|
||||
// Wrap with state management processor to store state after response completes
|
||||
let state_processor = ResponsesStateProcessor::new(
|
||||
base_processor,
|
||||
state_store,
|
||||
original_input_items,
|
||||
resolved_model.clone(),
|
||||
model_name.clone(),
|
||||
is_streaming_request,
|
||||
false, // Not OpenAI upstream since should_manage_state is true
|
||||
content_encoding,
|
||||
request_id.clone(),
|
||||
);
|
||||
create_streaming_response(byte_stream, state_processor, 16)
|
||||
} else {
|
||||
// Use base processor without state management
|
||||
create_streaming_response(byte_stream, base_processor, 16)
|
||||
};
|
||||
// Wrap with state management processor to store state after response completes
|
||||
let state_processor = ResponsesStateProcessor::new(
|
||||
base_processor,
|
||||
state_store,
|
||||
original_input_items,
|
||||
resolved_model.clone(),
|
||||
model_name.clone(),
|
||||
is_streaming_request,
|
||||
false, // Not OpenAI upstream since should_manage_state is true
|
||||
content_encoding,
|
||||
request_id.clone(),
|
||||
);
|
||||
create_streaming_response(byte_stream, state_processor, 16)
|
||||
} else {
|
||||
// Use base processor without state management
|
||||
create_streaming_response(byte_stream, base_processor, 16)
|
||||
};
|
||||
|
||||
match response.body(streaming_response.body) {
|
||||
Ok(response) => Ok(response),
|
||||
|
|
|
|||
|
|
@ -133,9 +133,7 @@ impl ResponseHandler {
|
|||
let response_headers = llm_response.headers();
|
||||
let is_sse_streaming = response_headers
|
||||
.get(hyper::header::CONTENT_TYPE)
|
||||
.is_some_and(|v| {
|
||||
v.to_str().unwrap_or("").contains("text/event-stream")
|
||||
});
|
||||
.is_some_and(|v| v.to_str().unwrap_or("").contains("text/event-stream"));
|
||||
|
||||
let response_bytes = llm_response
|
||||
.bytes()
|
||||
|
|
|
|||
|
|
@ -80,9 +80,9 @@ impl TryFrom<(&SupportedAPIsFromClient, &SupportedUpstreamAPIs)> for SseStreamBu
|
|||
SupportedAPIsFromClient::AnthropicMessagesAPI(_) => Ok(
|
||||
SseStreamBuffer::AnthropicMessages(AnthropicMessagesStreamBuffer::new()),
|
||||
),
|
||||
SupportedAPIsFromClient::OpenAIResponsesAPI(_) => Ok(SseStreamBuffer::OpenAIResponses(
|
||||
Box::default(),
|
||||
)),
|
||||
SupportedAPIsFromClient::OpenAIResponsesAPI(_) => {
|
||||
Ok(SseStreamBuffer::OpenAIResponses(Box::default()))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -141,7 +141,8 @@ impl HttpContext for StreamContext {
|
|||
|
||||
let last_user_prompt = match deserialized_body
|
||||
.messages
|
||||
.iter().rfind(|msg| msg.role == USER_ROLE)
|
||||
.iter()
|
||||
.rfind(|msg| msg.role == USER_ROLE)
|
||||
{
|
||||
Some(content) => content,
|
||||
None => {
|
||||
|
|
@ -153,9 +154,8 @@ impl HttpContext for StreamContext {
|
|||
self.user_prompt = Some(last_user_prompt.clone());
|
||||
|
||||
// convert prompt targets to ChatCompletionTool
|
||||
let tool_calls: Vec<ChatCompletionTool> = self
|
||||
.prompt_targets.values().map(|pt| pt.into())
|
||||
.collect();
|
||||
let tool_calls: Vec<ChatCompletionTool> =
|
||||
self.prompt_targets.values().map(|pt| pt.into()).collect();
|
||||
|
||||
let mut metadata = deserialized_body.metadata.clone();
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue