Support for Codex via Plano (#808)

* Add Codex CLI support; xAI response improvements

* Add native Plano running check and update CLI agent error handling

* adding PR suggestions for transformations and code quality

* message extraction logic in ResponsesAPIRequest

* xAI support for Responses API by routing to native endpoint + refactor code
This commit is contained in:
Musa 2026-03-10 20:54:14 -07:00 committed by GitHub
parent 5189f7907a
commit 6610097659
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 1297 additions and 200 deletions

View file

@ -108,7 +108,7 @@ pub struct ChatCompletionsRequest {
pub top_p: Option<f32>,
pub top_logprobs: Option<u32>,
pub user: Option<String>,
// pub web_search: Option<bool>, // GOOD FIRST ISSUE: Future support for web search
pub web_search_options: Option<Value>,
// VLLM-specific parameters (used by Arch-Function)
pub top_k: Option<u32>,

View file

@ -116,6 +116,8 @@ pub enum InputParam {
Text(String),
/// Array of input items (messages, references, outputs, etc.)
Items(Vec<InputItem>),
/// Single input item (some clients send object instead of array)
SingleItem(InputItem),
}
/// Input item - can be a message, item reference, function call output, etc.
@ -130,12 +132,20 @@ pub enum InputItem {
item_type: String,
id: String,
},
/// Function call emitted by model in prior turn
FunctionCall {
#[serde(rename = "type")]
item_type: String,
name: String,
arguments: String,
call_id: String,
},
/// Function call output
FunctionCallOutput {
#[serde(rename = "type")]
item_type: String,
call_id: String,
output: String,
output: serde_json::Value,
},
}
@ -166,6 +176,7 @@ pub enum MessageRole {
Assistant,
System,
Developer,
Tool,
}
/// Input content types
@ -173,6 +184,7 @@ pub enum MessageRole {
#[serde(tag = "type", rename_all = "snake_case")]
pub enum InputContent {
/// Text input
#[serde(rename = "input_text", alias = "text", alias = "output_text")]
InputText { text: String },
/// Image input via URL
InputImage {
@ -180,6 +192,7 @@ pub enum InputContent {
detail: Option<String>,
},
/// File input via URL
#[serde(rename = "input_file", alias = "file")]
InputFile { file_url: String },
/// Audio input
InputAudio {
@ -207,10 +220,11 @@ pub struct AudioConfig {
}
/// Text configuration
#[skip_serializing_none]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TextConfig {
/// Text format configuration
pub format: TextFormat,
pub format: Option<TextFormat>,
}
/// Text format
@ -285,6 +299,7 @@ pub enum Tool {
filters: Option<serde_json::Value>,
},
/// Web search tool
#[serde(rename = "web_search", alias = "web_search_preview")]
WebSearchPreview {
domains: Option<Vec<String>>,
search_context_size: Option<String>,
@ -298,6 +313,12 @@ pub enum Tool {
display_height_px: Option<i32>,
display_number: Option<i32>,
},
/// Custom tool (provider/SDK-specific tool contract)
Custom {
name: Option<String>,
description: Option<String>,
format: Option<serde_json::Value>,
},
}
/// Ranking options for file search
@ -1015,6 +1036,30 @@ pub struct ListInputItemsResponse {
// ProviderRequest Implementation
// ============================================================================
fn append_input_content_text(buffer: &mut String, content: &InputContent) {
match content {
InputContent::InputText { text } => buffer.push_str(text),
InputContent::InputImage { .. } => buffer.push_str("[Image]"),
InputContent::InputFile { .. } => buffer.push_str("[File]"),
InputContent::InputAudio { .. } => buffer.push_str("[Audio]"),
}
}
fn append_content_items_text(buffer: &mut String, content_items: &[InputContent]) {
for content in content_items {
// Preserve existing behavior: each content item is prefixed with a space.
buffer.push(' ');
append_input_content_text(buffer, content);
}
}
fn append_message_content_text(buffer: &mut String, content: &MessageContent) {
match content {
MessageContent::Text(text) => buffer.push_str(text),
MessageContent::Items(content_items) => append_content_items_text(buffer, content_items),
}
}
impl ProviderRequest for ResponsesAPIRequest {
fn model(&self) -> &str {
&self.model
@ -1031,36 +1076,27 @@ impl ProviderRequest for ResponsesAPIRequest {
fn extract_messages_text(&self) -> String {
match &self.input {
InputParam::Text(text) => text.clone(),
InputParam::Items(items) => {
items.iter().fold(String::new(), |acc, item| {
match item {
InputItem::Message(msg) => {
let content_text = match &msg.content {
MessageContent::Text(text) => text.clone(),
MessageContent::Items(content_items) => {
content_items.iter().fold(String::new(), |acc, content| {
acc + " "
+ &match content {
InputContent::InputText { text } => text.clone(),
InputContent::InputImage { .. } => {
"[Image]".to_string()
}
InputContent::InputFile { .. } => {
"[File]".to_string()
}
InputContent::InputAudio { .. } => {
"[Audio]".to_string()
}
}
})
}
};
acc + " " + &content_text
}
// Skip non-message items (references, outputs, etc.)
_ => acc,
InputParam::SingleItem(item) => {
// Normalize single-item input for extraction behavior parity.
match item {
InputItem::Message(msg) => {
let mut extracted = String::new();
append_message_content_text(&mut extracted, &msg.content);
extracted
}
})
_ => String::new(),
}
}
InputParam::Items(items) => {
let mut extracted = String::new();
for item in items {
if let InputItem::Message(msg) = item {
// Preserve existing behavior: each message is prefixed with a space.
extracted.push(' ');
append_message_content_text(&mut extracted, &msg.content);
}
}
extracted
}
}
}
@ -1068,6 +1104,20 @@ impl ProviderRequest for ResponsesAPIRequest {
fn get_recent_user_message(&self) -> Option<String> {
match &self.input {
InputParam::Text(text) => Some(text.clone()),
InputParam::SingleItem(item) => match item {
InputItem::Message(msg) if matches!(msg.role, MessageRole::User) => {
match &msg.content {
MessageContent::Text(text) => Some(text.clone()),
MessageContent::Items(content_items) => {
content_items.iter().find_map(|content| match content {
InputContent::InputText { text } => Some(text.clone()),
_ => None,
})
}
}
}
_ => None,
},
InputParam::Items(items) => {
items.iter().rev().find_map(|item| {
match item {
@ -1097,6 +1147,9 @@ impl ProviderRequest for ResponsesAPIRequest {
.iter()
.filter_map(|tool| match tool {
Tool::Function { name, .. } => Some(name.clone()),
Tool::Custom {
name: Some(name), ..
} => Some(name.clone()),
// Other tool types don't have user-defined names
_ => None,
})
@ -1366,6 +1419,7 @@ impl crate::providers::streaming_response::ProviderStreamResponse for ResponsesA
#[cfg(test)]
mod tests {
use super::*;
use serde_json::json;
#[test]
fn test_response_output_text_delta_deserialization() {
@ -1506,4 +1560,87 @@ mod tests {
_ => panic!("Expected ResponseCompleted event"),
}
}
#[test]
fn test_request_deserializes_custom_tool() {
let request = json!({
"model": "gpt-5.3-codex",
"input": "apply the patch",
"tools": [
{
"type": "custom",
"name": "run_patch",
"description": "Apply patch text",
"format": {
"kind": "patch",
"version": "v1"
}
}
]
});
let bytes = serde_json::to_vec(&request).unwrap();
let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap();
let tools = parsed.tools.expect("tools should be present");
assert_eq!(tools.len(), 1);
match &tools[0] {
Tool::Custom {
name,
description,
format,
} => {
assert_eq!(name.as_deref(), Some("run_patch"));
assert_eq!(description.as_deref(), Some("Apply patch text"));
assert!(format.is_some());
}
_ => panic!("expected custom tool"),
}
}
#[test]
fn test_request_deserializes_web_search_tool_alias() {
let request = json!({
"model": "gpt-5.3-codex",
"input": "find repository info",
"tools": [
{
"type": "web_search",
"domains": ["github.com"],
"search_context_size": "medium"
}
]
});
let bytes = serde_json::to_vec(&request).unwrap();
let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap();
let tools = parsed.tools.expect("tools should be present");
assert_eq!(tools.len(), 1);
match &tools[0] {
Tool::WebSearchPreview {
domains,
search_context_size,
..
} => {
assert_eq!(domains.as_ref().map(Vec::len), Some(1));
assert_eq!(search_context_size.as_deref(), Some("medium"));
}
_ => panic!("expected web search preview tool"),
}
}
#[test]
fn test_request_deserializes_text_config_without_format() {
let request = json!({
"model": "gpt-5.3-codex",
"input": "hello",
"text": {}
});
let bytes = serde_json::to_vec(&request).unwrap();
let parsed = ResponsesAPIRequest::try_from(bytes.as_slice()).unwrap();
assert!(parsed.text.is_some());
assert!(parsed.text.unwrap().format.is_none());
}
}

View file

@ -74,6 +74,7 @@ pub struct ResponsesAPIStreamBuffer {
/// Lifecycle state flags
created_emitted: bool,
in_progress_emitted: bool,
finalized: bool,
/// Track which output items we've added
output_items_added: HashMap<i32, String>, // output_index -> item_id
@ -109,6 +110,7 @@ impl ResponsesAPIStreamBuffer {
upstream_response_metadata: None,
created_emitted: false,
in_progress_emitted: false,
finalized: false,
output_items_added: HashMap::new(),
text_content: HashMap::new(),
function_arguments: HashMap::new(),
@ -236,7 +238,7 @@ impl ResponsesAPIStreamBuffer {
}),
store: Some(true),
text: Some(TextConfig {
format: TextFormat::Text,
format: Some(TextFormat::Text),
}),
audio: None,
modalities: None,
@ -255,8 +257,38 @@ impl ResponsesAPIStreamBuffer {
/// Finalize the response by emitting all *.done events and response.completed.
/// Call this when the stream is complete (after seeing [DONE] or end_of_stream).
pub fn finalize(&mut self) {
// Idempotent finalize: avoid duplicate response.completed loops.
if self.finalized {
return;
}
self.finalized = true;
let mut events = Vec::new();
// Ensure lifecycle prelude is emitted even if finalize is triggered
// by finish_reason before any prior delta was processed.
if !self.created_emitted {
if self.response_id.is_none() {
self.response_id = Some(format!(
"resp_{}",
uuid::Uuid::new_v4().to_string().replace("-", "")
));
self.created_at = Some(
std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as i64,
);
self.model = Some("unknown".to_string());
}
events.push(self.create_response_created_event());
self.created_emitted = true;
}
if !self.in_progress_emitted {
events.push(self.create_response_in_progress_event());
self.in_progress_emitted = true;
}
// Emit done events for all accumulated content
// Text content done events
@ -443,6 +475,12 @@ impl SseStreamBufferTrait for ResponsesAPIStreamBuffer {
}
};
// Explicit completion marker from transform layer.
if matches!(stream_event.as_ref(), ResponsesAPIStreamEvent::Done { .. }) {
self.finalize();
return;
}
let mut events = Vec::new();
// Capture upstream metadata from ResponseCreated or ResponseInProgress if present
@ -789,4 +827,30 @@ mod tests {
println!("✓ NO completion events (partial stream, no [DONE])");
println!("✓ Arguments accumulated: '{{\"location\":\"'\n");
}
#[test]
fn test_finish_reason_without_done_still_finalizes_once() {
let raw_input = r#"data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{"role":"assistant","content":"Hello"},"finish_reason":null}]}
data: {"id":"chatcmpl-123","object":"chat.completion.chunk","created":1234567890,"model":"gpt-4o","choices":[{"index":0,"delta":{},"finish_reason":"stop"}]}"#;
let client_api = SupportedAPIsFromClient::OpenAIResponsesAPI(OpenAIApi::Responses);
let upstream_api = SupportedUpstreamAPIs::OpenAIChatCompletions(OpenAIApi::ChatCompletions);
let stream_iter = SseStreamIter::try_from(raw_input.as_bytes()).unwrap();
let mut buffer = ResponsesAPIStreamBuffer::new();
for raw_event in stream_iter {
let transformed_event =
SseEvent::try_from((raw_event, &client_api, &upstream_api)).unwrap();
buffer.add_transformed_event(transformed_event);
}
let output = String::from_utf8_lossy(&buffer.to_bytes()).to_string();
let completed_count = output.matches("event: response.completed").count();
assert_eq!(
completed_count, 1,
"response.completed should be emitted exactly once"
);
}
}