Merge branch 'main' into adil/agents_framework

This commit is contained in:
Adil Hafeez 2025-12-17 16:49:38 -08:00
commit 660f8d433f
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
26 changed files with 2692 additions and 93 deletions

View file

@ -59,6 +59,11 @@ pub struct ResponsesAPIStreamBuffer {
model: Option<String>,
created_at: Option<i64>,
/// Full response metadata from upstream (tools, temperature, etc.)
/// This is extracted from the first upstream event and used to build
/// complete response.created and response.in_progress events
upstream_response_metadata: Option<ResponsesAPIResponse>,
/// Lifecycle state flags
created_emitted: bool,
in_progress_emitted: bool,
@ -88,6 +93,7 @@ impl ResponsesAPIStreamBuffer {
response_id: None,
model: None,
created_at: None,
upstream_response_metadata: None,
created_emitted: false,
in_progress_emitted: false,
output_items_added: HashMap::new(),
@ -171,6 +177,15 @@ impl ResponsesAPIStreamBuffer {
/// Build the base response object with current state
fn build_response(&self, status: ResponseStatus) -> ResponsesAPIResponse {
// If we have upstream metadata, use it as a base and update status/output
if let Some(upstream) = &self.upstream_response_metadata {
let mut response = upstream.clone();
response.status = status;
// Don't update output here - will be set in finalize()
return response;
}
// Fallback: build a minimal response from local state
ResponsesAPIResponse {
id: self.response_id.clone().unwrap_or_default(),
object: "response".to_string(),
@ -293,24 +308,40 @@ impl ResponsesAPIStreamBuffer {
// Build final response
let mut output_items = Vec::new();
// Add tool calls to output
for (item_id, arguments) in &self.function_arguments {
let output_index = self.output_items_added.iter()
.find(|(_, id)| *id == item_id)
.map(|(idx, _)| *idx)
.unwrap_or(0);
// Build complete output array by iterating through all output indices in order
let max_output_index = self.output_items_added.keys().max().copied().unwrap_or(-1);
let (call_id, name) = self.tool_call_metadata.get(&output_index)
.cloned()
.unwrap_or_else(|| (format!("call_{}", uuid::Uuid::new_v4()), "unknown".to_string()));
for output_index in 0..=max_output_index {
if let Some(item_id) = self.output_items_added.get(&output_index) {
// Check if this is a function call
if let Some(arguments) = self.function_arguments.get(item_id) {
let (call_id, name) = self.tool_call_metadata.get(&output_index)
.cloned()
.unwrap_or_else(|| (format!("call_{}", uuid::Uuid::new_v4()), "unknown".to_string()));
output_items.push(OutputItem::FunctionCall {
id: item_id.clone(),
status: OutputItemStatus::Completed,
call_id,
name: Some(name),
arguments: Some(arguments.clone()),
});
output_items.push(OutputItem::FunctionCall {
id: item_id.clone(),
status: OutputItemStatus::Completed,
call_id,
name: Some(name),
arguments: Some(arguments.clone()),
});
}
// Check if this is a text message
else if let Some(text) = self.text_content.get(item_id) {
use crate::apis::openai_responses::OutputContent;
output_items.push(OutputItem::Message {
id: item_id.clone(),
status: OutputItemStatus::Completed,
role: "assistant".to_string(),
content: vec![OutputContent::OutputText {
text: text.clone(),
annotations: vec![],
logprobs: None,
}],
});
}
}
}
let mut final_response = self.build_response(ResponseStatus::Completed);
@ -365,6 +396,24 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer {
let mut events = Vec::new();
// Capture upstream metadata from ResponseCreated or ResponseInProgress if present
match stream_event {
ResponsesAPIStreamEvent::ResponseCreated { response, .. } |
ResponsesAPIStreamEvent::ResponseInProgress { response, .. } => {
if self.upstream_response_metadata.is_none() {
// Store the full upstream response as our metadata template
self.upstream_response_metadata = Some(response.clone());
// Also extract basic fields
self.response_id = Some(response.id.clone());
self.model = Some(response.model.clone());
self.created_at = Some(response.created_at);
}
// Don't emit these - we'll generate our own lifecycle events
return;
}
_ => {}
}
// Emit lifecycle events if not yet emitted
if !self.created_emitted {
// Initialize metadata from first event if needed

View file

@ -193,6 +193,40 @@ impl SupportedAPIsFromClient {
}
}
impl SupportedUpstreamAPIs {
/// Create a SupportedUpstreamApi from an endpoint path
pub fn from_endpoint(endpoint: &str) -> Option<Self> {
if let Some(openai_api) = OpenAIApi::from_endpoint(endpoint) {
// Check if this is the Responses API endpoint
if openai_api == OpenAIApi::Responses {
return Some(SupportedUpstreamAPIs::OpenAIResponsesAPI(openai_api));
}
// Otherwise it's ChatCompletions
return Some(SupportedUpstreamAPIs::OpenAIChatCompletions(openai_api));
}
if let Some(anthropic_api) = AnthropicApi::from_endpoint(endpoint) {
return Some(SupportedUpstreamAPIs::AnthropicMessagesAPI(anthropic_api));
}
if let Some(bedrock_api) = AmazonBedrockApi::from_endpoint(endpoint) {
match bedrock_api {
AmazonBedrockApi::Converse => {
return Some(SupportedUpstreamAPIs::AmazonBedrockConverse(bedrock_api))
}
AmazonBedrockApi::ConverseStream => {
return Some(SupportedUpstreamAPIs::AmazonBedrockConverseStream(bedrock_api))
}
}
}
None
}
}
/// Get all supported endpoint paths
pub fn supported_endpoints() -> Vec<&'static str> {
let mut endpoints = Vec::new();

View file

@ -1,3 +1,4 @@
//! Response transformation modules
pub mod output_to_input;
pub mod to_anthropic;
pub mod to_openai;

View file

@ -0,0 +1,178 @@
//! Conversions from response outputs to request inputs for conversation continuation
//!
//! This module provides utilities for converting OutputItem types from API responses
//! into InputItem types that can be used in subsequent requests. This is primarily used
//! for maintaining conversation history in the v1/responses API.
use crate::apis::openai_responses::{
InputContent, InputItem, InputMessage, MessageContent, MessageRole, OutputContent, OutputItem,
};
/// Converts an OutputItem from a response into an InputItem for the next request
/// This is used to build conversation history from previous responses
pub fn convert_responses_output_to_input_items(output: &OutputItem) -> Option<InputItem> {
match output {
// Convert output messages to input messages
OutputItem::Message {
role, content, ..
} => {
let input_content: Vec<InputContent> = content
.iter()
.filter_map(|c| match c {
OutputContent::OutputText { text, .. } => Some(InputContent::InputText {
text: text.clone(),
}),
OutputContent::OutputAudio {
data, ..
} => Some(InputContent::InputAudio {
data: data.clone(),
format: None, // Format not preserved in output
}),
OutputContent::Refusal { .. } => None, // Skip refusals
})
.collect();
if input_content.is_empty() {
return None;
}
// Map role string to MessageRole enum
let message_role = match role.as_str() {
"user" => MessageRole::User,
"assistant" => MessageRole::Assistant,
"system" => MessageRole::System,
"developer" => MessageRole::Developer,
_ => MessageRole::Assistant, // Default to assistant
};
Some(InputItem::Message(InputMessage {
role: message_role,
content: MessageContent::Items(input_content),
}))
}
// For function calls, we'll create an assistant message with the tool call info
// This matches how conversation history is typically built
OutputItem::FunctionCall {
name, arguments, ..
} => {
let tool_call_text = if let (Some(n), Some(args)) = (name, arguments) {
format!("Called function: {} with arguments: {}", n, args)
} else {
"Called a function".to_string()
};
Some(InputItem::Message(InputMessage {
role: MessageRole::Assistant,
content: MessageContent::Items(vec![InputContent::InputText {
text: tool_call_text,
}]),
}))
}
// Skip other output types (tool outputs, etc.) as they don't convert to input
_ => None,
}
}
/// Converts a Vec of OutputItems into InputItems for conversation continuation
pub fn outputs_to_inputs(outputs: &[OutputItem]) -> Vec<InputItem> {
outputs
.iter()
.filter_map(convert_responses_output_to_input_items)
.collect()
}
#[cfg(test)]
mod tests {
use super::*;
use crate::apis::openai_responses::{OutputItemStatus};
#[test]
fn test_output_message_to_input() {
let output = OutputItem::Message {
id: "msg_123".to_string(),
status: OutputItemStatus::Completed,
role: "assistant".to_string(),
content: vec![OutputContent::OutputText {
text: "Hello!".to_string(),
annotations: vec![],
logprobs: None,
}],
};
let input = convert_responses_output_to_input_items(&output).unwrap();
match input {
InputItem::Message(msg) => {
assert!(matches!(msg.role, MessageRole::Assistant));
match &msg.content {
MessageContent::Items(items) => {
assert_eq!(items.len(), 1);
match &items[0] {
InputContent::InputText { text } => assert_eq!(text, "Hello!"),
_ => panic!("Expected InputText"),
}
}
_ => panic!("Expected MessageContent::Items"),
}
}
_ => panic!("Expected Message variant"),
}
}
#[test]
fn test_function_call_to_input() {
let output = OutputItem::FunctionCall {
id: "fc_123".to_string(),
status: OutputItemStatus::Completed,
call_id: "call_123".to_string(),
name: Some("get_weather".to_string()),
arguments: Some(r#"{"location":"SF"}"#.to_string()),
};
let input = convert_responses_output_to_input_items(&output).unwrap();
match input {
InputItem::Message(msg) => {
assert!(matches!(msg.role, MessageRole::Assistant));
match &msg.content {
MessageContent::Items(items) => {
match &items[0] {
InputContent::InputText { text } => {
assert!(text.contains("get_weather"));
}
_ => panic!("Expected InputText"),
}
}
_ => panic!("Expected MessageContent::Items"),
}
}
_ => panic!("Expected Message variant"),
}
}
#[test]
fn test_outputs_to_inputs() {
let outputs = vec![
OutputItem::Message {
id: "msg_1".to_string(),
status: OutputItemStatus::Completed,
role: "assistant".to_string(),
content: vec![OutputContent::OutputText {
text: "Hello".to_string(),
annotations: vec![],
logprobs: None,
}],
},
OutputItem::FunctionCall {
id: "fc_1".to_string(),
status: OutputItemStatus::Completed,
call_id: "call_1".to_string(),
name: Some("test".to_string()),
arguments: Some("{}".to_string()),
},
];
let inputs = outputs_to_inputs(&outputs);
assert_eq!(inputs.len(), 2);
}
}

View file

@ -80,8 +80,19 @@ impl TryFrom<ChatCompletionsResponse> for ResponsesAPIResponse {
// Only add the message item if there's actual content (text, audio, or refusal)
// Don't add empty message items when there are only tool calls
if !content.is_empty() {
// Generate message ID: strip common prefixes to avoid double-prefixing
let message_id = if resp.id.starts_with("msg_") {
resp.id.clone()
} else if resp.id.starts_with("resp_") {
format!("msg_{}", &resp.id[5..]) // Strip "resp_" prefix
} else if resp.id.starts_with("chatcmpl-") {
format!("msg_{}", &resp.id[9..]) // Strip "chatcmpl-" prefix
} else {
format!("msg_{}", resp.id)
};
items.push(OutputItem::Message {
id: format!("msg_{}", resp.id),
id: message_id,
status: OutputItemStatus::Completed,
role: match choice.message.role {
Role::User => "user".to_string(),
@ -151,7 +162,12 @@ impl TryFrom<ChatCompletionsResponse> for ResponsesAPIResponse {
};
Ok(ResponsesAPIResponse {
id: resp.id,
// Generate proper resp_ prefixed ID if not already present
id: if resp.id.starts_with("resp_") {
resp.id
} else {
format!("resp_{}", uuid::Uuid::new_v4().to_string().replace("-", ""))
},
object: "response".to_string(),
created_at: resp.created as i64,
status,
@ -942,7 +958,7 @@ mod tests {
use crate::apis::openai_responses::{OutputContent, OutputItem, ResponsesAPIResponse};
let chat_response = ChatCompletionsResponse {
id: "chatcmpl-123".to_string(),
id: "resp_6de5512800cf4375a329a473a4f02879".to_string(),
object: Some("chat.completion".to_string()),
created: 1677652288,
model: "gpt-4".to_string(),
@ -974,7 +990,9 @@ mod tests {
let responses_api: ResponsesAPIResponse = chat_response.try_into().unwrap();
assert_eq!(responses_api.id, "chatcmpl-123");
// Response ID should be generated with resp_ prefix
assert!(responses_api.id.starts_with("resp_"), "Response ID should start with 'resp_'");
assert_eq!(responses_api.id.len(), 37, "Response ID should be resp_ + 32 char UUID");
assert_eq!(responses_api.object, "response");
assert_eq!(responses_api.model, "gpt-4");

View file

@ -58,11 +58,11 @@ impl TryFrom<MessagesStreamEvent> for ChatCompletionsStreamResponse {
None,
)),
MessagesStreamEvent::ContentBlockStart { content_block, .. } => {
convert_content_block_start(content_block)
MessagesStreamEvent::ContentBlockStart { content_block, index } => {
convert_content_block_start(content_block, index)
}
MessagesStreamEvent::ContentBlockDelta { delta, .. } => convert_content_delta(delta),
MessagesStreamEvent::ContentBlockDelta { delta, index } => convert_content_delta(delta, index),
MessagesStreamEvent::ContentBlockStop { .. } => Ok(create_empty_openai_chunk()),
@ -272,6 +272,7 @@ impl TryFrom<ConverseStreamEvent> for ChatCompletionsStreamResponse {
/// Convert content block start to OpenAI chunk
fn convert_content_block_start(
content_block: MessagesContentBlock,
index: u32,
) -> Result<ChatCompletionsStreamResponse, TransformError> {
match content_block {
MessagesContentBlock::Text { .. } => {
@ -291,7 +292,7 @@ fn convert_content_block_start(
refusal: None,
function_call: None,
tool_calls: Some(vec![ToolCallDelta {
index: 0,
index,
id: Some(id),
call_type: Some("function".to_string()),
function: Some(FunctionCallDelta {
@ -313,6 +314,7 @@ fn convert_content_block_start(
/// Convert content delta to OpenAI chunk
fn convert_content_delta(
delta: MessagesContentDelta,
index: u32,
) -> Result<ChatCompletionsStreamResponse, TransformError> {
match delta {
MessagesContentDelta::TextDelta { text } => Ok(create_openai_chunk(
@ -350,7 +352,7 @@ fn convert_content_delta(
refusal: None,
function_call: None,
tool_calls: Some(vec![ToolCallDelta {
index: 0,
index,
id: None,
call_type: None,
function: Some(FunctionCallDelta {