fixed for claude code routing. first commit

This commit is contained in:
Salman Paracha 2025-09-26 22:25:59 -07:00
parent 03c2cf6f0d
commit 39bd786280
8 changed files with 402 additions and 70 deletions

View file

@ -9,6 +9,7 @@ from cli.docker_cli import docker_validate_archgw_schema, stream_gateway_logs
from cli.utils import (
getLogger,
get_llm_provider_access_keys,
has_ingress_listener,
load_env_file_to_dict,
stream_access_logs,
)
@ -240,8 +241,15 @@ def up(file, path, service, foreground):
if service == SERVICE_NAME_ARCHGW:
start_arch(arch_config_file, env, foreground=foreground)
else:
download_models_from_hf()
start_arch_modelserver(foreground)
# Check if ingress_traffic listener is configured before starting model_server
if has_ingress_listener(arch_config_file):
download_models_from_hf()
start_arch_modelserver(foreground)
else:
log.info(
"Skipping model_server startup: no ingress_traffic listener configured in arch_config.yaml"
)
start_arch(arch_config_file, env, foreground=foreground)

View file

@ -21,6 +21,22 @@ def getLogger(name="cli"):
log = getLogger(__name__)
def has_ingress_listener(arch_config_file):
"""Check if the arch config file has ingress_traffic listener configured."""
try:
with open(arch_config_file) as f:
arch_config_dict = yaml.safe_load(f)
ingress_traffic = arch_config_dict.get("listeners", {}).get(
"ingress_traffic", {}
)
return bool(ingress_traffic)
except Exception as e:
log.error(f"Error reading config file {arch_config_file}: {e}")
return False
def get_llm_provider_access_keys(arch_config_file):
with open(arch_config_file, "r") as file:
arch_config = file.read()

View file

@ -126,8 +126,9 @@ pub async fn chat(
});
const MAX_MESSAGE_LENGTH: usize = 50;
let latest_message_for_log = if latest_message_for_log.len() > MAX_MESSAGE_LENGTH {
format!("{}...", &latest_message_for_log[..MAX_MESSAGE_LENGTH])
let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH {
let truncated: String = latest_message_for_log.chars().take(MAX_MESSAGE_LENGTH).collect();
format!("{}...", truncated)
} else {
latest_message_for_log
};

File diff suppressed because one or more lines are too long

View file

@ -352,6 +352,7 @@ impl TryFrom<ChatCompletionsStreamResponse> for MessagesStreamEvent {
let choice = &resp.choices[0];
// Handle final chunk with usage
let has_usage = resp.usage.is_some();
if let Some(usage) = resp.usage {
if let Some(finish_reason) = &choice.finish_reason {
let anthropic_stop_reason: MessagesStopReason = finish_reason.clone().into();
@ -403,11 +404,27 @@ impl TryFrom<ChatCompletionsStreamResponse> for MessagesStreamEvent {
return convert_tool_call_deltas(tool_calls.clone());
}
// Handle finish reason
// Handle finish reason - generate MessageDelta only (MessageStop comes later)
if let Some(finish_reason) = &choice.finish_reason {
if *finish_reason == FinishReason::Stop {
return Ok(MessagesStreamEvent::MessageStop);
// If we have usage data, it was already handled above
// If not, we need to generate MessageDelta with default usage
if !has_usage {
let anthropic_stop_reason: MessagesStopReason = finish_reason.clone().into();
return Ok(MessagesStreamEvent::MessageDelta {
delta: MessagesMessageDelta {
stop_reason: anthropic_stop_reason,
stop_sequence: None,
},
usage: MessagesUsage {
input_tokens: 0,
output_tokens: 0,
cache_creation_input_tokens: None,
cache_read_input_tokens: None,
},
});
}
// If usage was already handled above, we don't need to do anything more here
// MessageStop will be handled when [DONE] is encountered
}
// Default to ping for unhandled cases
@ -468,18 +485,6 @@ impl TryFrom<MessagesMessage> for Vec<Message> {
}
MessagesMessageContent::Blocks(blocks) => {
let (content_parts, tool_calls, tool_results) = blocks.split_for_openai()?;
// Create main message
let content = build_openai_content(content_parts, &tool_calls);
let main_message = Message {
role: message.role.into(),
content,
name: None,
tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) },
tool_call_id: None,
};
result.push(main_message);
// Add tool result messages
for (tool_use_id, result_text, _is_error) in tool_results {
result.push(Message {
@ -490,6 +495,20 @@ impl TryFrom<MessagesMessage> for Vec<Message> {
tool_call_id: Some(tool_use_id),
});
}
// Only create main message if there's actual content or tool calls
// Skip creating empty content messages (e.g., when message only contains tool_result blocks)
if !content_parts.is_empty() || !tool_calls.is_empty() {
let content = build_openai_content(content_parts, &tool_calls);
let main_message = Message {
role: message.role.into(),
content,
name: None,
tool_calls: if tool_calls.is_empty() { None } else { Some(tool_calls) },
tool_call_id: None,
};
result.push(main_message);
}
}
}
@ -515,9 +534,11 @@ impl TryFrom<Message> for MessagesMessage {
MessagesContentBlock::ToolResult {
tool_use_id: tool_call_id,
is_error: None,
content: vec![MessagesContentBlock::Text {
content: ToolResultContent::Blocks(vec![MessagesContentBlock::Text {
text: message.content.extract_text(),
}],
cache_control: None,
}]),
cache_control: None,
},
]),
});
@ -551,7 +572,7 @@ impl ContentUtils<ToolCall> for Vec<MessagesContentBlock> {
for block in self {
match block {
MessagesContentBlock::ToolUse { id, name, input } |
MessagesContentBlock::ToolUse { id, name, input, .. } |
MessagesContentBlock::ServerToolUse { id, name, input } |
MessagesContentBlock::McpToolUse { id, name, input } => {
let arguments = serde_json::to_string(&input)?;
@ -575,7 +596,7 @@ impl ContentUtils<ToolCall> for Vec<MessagesContentBlock> {
for block in self {
match block {
MessagesContentBlock::Text { text } => {
MessagesContentBlock::Text { text, .. } => {
content_parts.push(ContentPart::Text { text: text.clone() });
}
MessagesContentBlock::Image { source } => {
@ -587,7 +608,7 @@ impl ContentUtils<ToolCall> for Vec<MessagesContentBlock> {
},
});
}
MessagesContentBlock::ToolUse { id, name, input } |
MessagesContentBlock::ToolUse { id, name, input, .. } |
MessagesContentBlock::ServerToolUse { id, name, input } |
MessagesContentBlock::McpToolUse { id, name, input } => {
let arguments = serde_json::to_string(&input)?;
@ -597,7 +618,10 @@ impl ContentUtils<ToolCall> for Vec<MessagesContentBlock> {
function: FunctionCall { name: name.clone(), arguments },
});
}
MessagesContentBlock::ToolResult { tool_use_id, content, is_error } |
MessagesContentBlock::ToolResult { tool_use_id, content, is_error, .. } => {
let result_text = content.extract_text();
tool_results.push((tool_use_id.clone(), result_text, is_error.unwrap_or(false)));
}
MessagesContentBlock::WebSearchToolResult { tool_use_id, content, is_error } |
MessagesContentBlock::CodeExecutionToolResult { tool_use_id, content, is_error } |
MessagesContentBlock::McpToolResult { tool_use_id, content, is_error } => {
@ -819,7 +843,7 @@ fn build_openai_content(content_parts: Vec<ContentPart>, tool_calls: &[ToolCall]
fn build_anthropic_content(content_blocks: Vec<MessagesContentBlock>) -> MessagesMessageContent {
if content_blocks.len() == 1 {
match &content_blocks[0] {
MessagesContentBlock::Text { text } => MessagesMessageContent::Single(text.clone()),
MessagesContentBlock::Text { text, .. } => MessagesMessageContent::Single(text.clone()),
_ => MessagesMessageContent::Blocks(content_blocks),
}
} else if content_blocks.is_empty() {
@ -835,10 +859,10 @@ fn convert_anthropic_content_to_openai(content: &[MessagesContentBlock]) -> Resu
for block in content {
match block {
MessagesContentBlock::Text { text } => {
MessagesContentBlock::Text { text, .. } => {
text_parts.push(text.clone());
}
MessagesContentBlock::Thinking { text } => {
MessagesContentBlock::Thinking { text, .. } => {
// Include thinking as regular text for OpenAI
text_parts.push(format!("[Thinking: {}]", text));
}
@ -860,14 +884,14 @@ fn convert_openai_message_to_anthropic_content(message: &Message) -> Result<Vec<
match &message.content {
MessageContent::Text(text) => {
if !text.is_empty() {
blocks.push(MessagesContentBlock::Text { text: text.clone() });
blocks.push(MessagesContentBlock::Text { text: text.clone(), cache_control: None });
}
}
MessageContent::Parts(parts) => {
for part in parts {
match part {
ContentPart::Text { text } => {
blocks.push(MessagesContentBlock::Text { text: text.clone() });
blocks.push(MessagesContentBlock::Text { text: text.clone(), cache_control: None });
}
ContentPart::ImageUrl { image_url } => {
let source = convert_image_url_to_source(image_url);
@ -886,6 +910,7 @@ fn convert_openai_message_to_anthropic_content(message: &Message) -> Result<Vec<
id: tool_call.id.clone(),
name: tool_call.function.name.clone(),
input,
cache_control: None,
});
}
}
@ -1023,6 +1048,7 @@ fn convert_tool_call_deltas(tool_calls: Vec<ToolCallDelta>) -> Result<MessagesSt
id: id.clone(),
name: name.clone(),
input: Value::Object(serde_json::Map::new()),
cache_control: None,
},
});
}
@ -1254,6 +1280,7 @@ mod tests {
id: "call_123".to_string(),
name: "get_weather".to_string(),
input: json!({}),
cache_control: None,
},
};
@ -1566,6 +1593,7 @@ mod tests {
id: "call_weather".to_string(),
name: "get_weather".to_string(),
input: json!({}),
cache_control: None,
},
};

View file

@ -269,6 +269,13 @@ impl TryFrom<(&[u8], &SupportedAPIs, &SupportedAPIs)> for ProviderStreamResponse
Ok(ProviderStreamResponseType::ChatCompletionsStreamResponse(chat_resp))
}
(SupportedAPIs::OpenAIChatCompletions(_), SupportedAPIs::AnthropicMessagesAPI(_)) => {
// Special case: Handle [DONE] marker for OpenAI -> Anthropic conversion
if bytes == b"[DONE]" {
return Ok(ProviderStreamResponseType::MessagesStreamEvent(
crate::apis::anthropic::MessagesStreamEvent::MessageStop
));
}
let openai_resp: crate::apis::openai::ChatCompletionsStreamResponse = serde_json::from_slice(bytes)?;
// Transform to Anthropic Messages stream format using the transformer
@ -287,8 +294,8 @@ impl TryFrom<(SseEvent, &SupportedAPIs, &SupportedAPIs)> for SseEvent {
// Create a new transformed event based on the original
let mut transformed_event = sse_event;
// If not [DONE] and has data, parse the data as a provider stream response (business logic layer)
if !transformed_event.is_done() && transformed_event.data.is_some() {
// If has data, parse the data as a provider stream response (business logic layer)
if transformed_event.data.is_some() {
let data_str = transformed_event.data.as_ref().unwrap();
let data_bytes = data_str.as_bytes();
let transformed_response = ProviderStreamResponseType::try_from((data_bytes, client_api, upstream_api))?;
@ -380,6 +387,7 @@ where
I::Item: AsRef<str>,
{
pub lines: I,
pub done_seen: bool,
}
impl<I> SseStreamIter<I>
@ -388,7 +396,7 @@ where
I::Item: AsRef<str>,
{
pub fn new(lines: I) -> Self {
Self { lines }
Self { lines, done_seen: false }
}
}
@ -411,14 +419,20 @@ where
type Item = SseEvent;
fn next(&mut self) -> Option<Self::Item> {
// If we already returned [DONE], terminate the stream
if self.done_seen {
return None;
}
for line in &mut self.lines {
let line_str = line.as_ref();
// Try to parse as either data: or event: line
if let Ok(event) = line_str.parse::<SseEvent>() {
// For data: lines, check if this is the [DONE] marker - if so, end the stream
// For data: lines, check if this is the [DONE] marker
if event.data.is_some() && event.is_done() {
return None;
self.done_seen = true;
return Some(event); // Return [DONE] event for transformation
}
// For data: lines, skip events that should be filtered at the transport layer
if event.data.is_some() && event.should_skip() {
@ -706,7 +720,11 @@ mod tests {
assert!(event2.data.as_ref().unwrap().contains("msg2"));
assert!(!event2.should_skip());
// Iterator should end at [DONE] (no more events)
// Third event should be [DONE]
let done_event = iter.next().unwrap();
assert!(done_event.is_done());
// Iterator should end after [DONE]
assert!(iter.next().is_none());
}
@ -745,7 +763,11 @@ mod tests {
assert!(!event4.is_event_only());
assert!(event4.data.as_ref().unwrap().contains("Hello"));
// Iterator should end at [DONE]
// Fifth event should be [DONE]
let done_event = iter.next().unwrap();
assert!(done_event.is_done());
// Iterator should end after [DONE]
assert!(iter.next().is_none());
}
@ -776,4 +798,25 @@ mod tests {
let provider_type = ProviderStreamResponseType::ChatCompletionsStreamResponse(openai_event);
assert_eq!(provider_type.event_type(), None);
}
#[test]
fn test_done_marker_handled_in_stream_response_transformation() {
use crate::apis::anthropic::AnthropicApi;
// Test that [DONE] marker is properly converted to MessageStop in the transformation layer
let done_bytes = b"[DONE]";
let client_api = SupportedAPIs::AnthropicMessagesAPI(AnthropicApi::Messages);
let upstream_api = SupportedAPIs::OpenAIChatCompletions(crate::apis::openai::OpenAIApi::ChatCompletions);
let result = ProviderStreamResponseType::try_from((done_bytes.as_slice(), &client_api, &upstream_api));
assert!(result.is_ok());
if let Ok(ProviderStreamResponseType::MessagesStreamEvent(event)) = result {
// Verify it's a MessageStop event
assert_eq!(event.event_type(), Some("message_stop"));
assert!(matches!(event, crate::apis::anthropic::MessagesStreamEvent::MessageStop));
} else {
panic!("Expected MessagesStreamEvent::MessageStop");
}
}
}

View file

@ -395,23 +395,15 @@ impl StreamContext {
}
}
fn debug_log_body(&self, body: &[u8]) {
debug!(
"[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}",
self.request_identifier(),
body.len(),
String::from_utf8_lossy(body)
);
}
fn handle_streaming_response(
&mut self,
body: &[u8],
provider_id: ProviderId,
) -> Result<Vec<u8>, Action> {
debug!(
"[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: provider_id={:?} chunk_size={}",
"[ARCHGW_REQ_ID:{}] STREAMING_PROCESS: client={:?} provider_id={:?} chunk_size={}",
self.request_identifier(),
self.client_api,
provider_id,
body.len()
);
@ -958,7 +950,12 @@ impl HttpContext for StreamContext {
Err(action) => return action,
};
self.debug_log_body(&body);
debug!(
"[ARCHGW_REQ_ID:{}] UPSTREAM_RAW_RESPONSE: body_size={} content={}",
self.request_identifier(),
body.len(),
String::from_utf8_lossy(&body)
);
let provider_id = self.get_provider_id();
if self.streaming_response {

View file

@ -417,12 +417,12 @@ def test_anthropic_client_with_openai_model_streaming():
client = anthropic.Anthropic(api_key="test-key", base_url=base_url)
with client.messages.stream(
model="gpt-4o-mini", # OpenAI model via Anthropic client
max_tokens=50,
model="gpt-5-mini-2025-08-07", # OpenAI model via Anthropic client
max_tokens=500,
messages=[
{
"role": "user",
"content": "Hello, please respond with exactly: Hello from GPT-4o-mini via Anthropic!",
"content": "Hello, please respond with exactly: Hello from ChatGPT!",
}
],
) as stream:
@ -435,8 +435,8 @@ def test_anthropic_client_with_openai_model_streaming():
# A safe way to reassemble text from the content blocks:
final_text = "".join(b.text for b in final.content if b.type == "text")
assert full_text == "Hello from GPT-4o-mini via Anthropic!"
assert final_text == "Hello from GPT-4o-mini via Anthropic!"
assert full_text == "Hello from ChatGPT!"
assert final_text == "Hello from ChatGPT!"
def test_openai_gpt4o_mini_v1_messages_api():