Merge remote-tracking branch 'origin/main' into pytest

This commit is contained in:
Salman Paracha 2025-10-24 15:46:53 -07:00
commit cecc3db27d
8 changed files with 149 additions and 61 deletions

View file

@ -51,7 +51,7 @@ static_resources:
envoy_grpc: envoy_grpc:
cluster_name: opentelemetry_collector cluster_name: opentelemetry_collector
timeout: 0.250s timeout: 0.250s
service_name: arch_gateway service_name: archgw(inbound)
random_sampling: random_sampling:
value: {{ arch_tracing.random_sampling }} value: {{ arch_tracing.random_sampling }}
{% endif %} {% endif %}
@ -416,7 +416,7 @@ static_resources:
envoy_grpc: envoy_grpc:
cluster_name: opentelemetry_collector cluster_name: opentelemetry_collector
timeout: 0.250s timeout: 0.250s
service_name: arch_gateway service_name: archgw(outbound)
random_sampling: random_sampling:
value: {{ arch_tracing.random_sampling }} value: {{ arch_tracing.random_sampling }}
{% endif %} {% endif %}
@ -497,21 +497,6 @@ static_resources:
- name: envoy.filters.network.http_connection_manager - name: envoy.filters.network.http_connection_manager
typed_config: typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
generate_request_id: true
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: egress_traffic_llm
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
stat_prefix: egress_traffic stat_prefix: egress_traffic
codec_type: AUTO codec_type: AUTO
scheme_header_transformation: scheme_header_transformation:

View file

@ -160,16 +160,41 @@ impl TraceData {
} }
} }
pub fn new_with_service_name(service_name: String) -> Self {
let mut resource_attributes = Vec::new();
resource_attributes.push(Attribute {
key: "service.name".to_string(),
value: AttributeValue {
string_value: Some(service_name),
},
});
let resource = Resource {
attributes: resource_attributes,
};
let scope_span = ScopeSpan {
scope: Scope {
name: "default".to_string(),
version: "1.0".to_string(),
attributes: Vec::new(),
},
spans: Vec::new(),
};
let resource_span = ResourceSpan {
resource,
scope_spans: vec![scope_span],
};
TraceData {
resource_spans: vec![resource_span],
}
}
pub fn add_span(&mut self, span: Span) { pub fn add_span(&mut self, span: Span) {
if self.resource_spans.is_empty() { if self.resource_spans.is_empty() {
let resource = Resource { let resource = Resource { attributes: Vec::new() };
attributes: vec![Attribute {
key: "service.name".to_string(),
value: AttributeValue {
string_value: Some("egress_llm_traffic".to_string()),
},
}],
};
let scope_span = ScopeSpan { let scope_span = ScopeSpan {
scope: Scope { scope: Scope {
name: "default".to_string(), name: "default".to_string(),

View file

@ -21,10 +21,29 @@ impl fmt::Display for SupportedAPIs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self { match self {
SupportedAPIs::OpenAIChatCompletions(api) => { SupportedAPIs::OpenAIChatCompletions(api) => {
write!(f, "OpenAI API ({})", api.endpoint()) write!(f, "OpenAI ({})", api.endpoint())
} }
SupportedAPIs::AnthropicMessagesAPI(api) => { SupportedAPIs::AnthropicMessagesAPI(api) => {
write!(f, "Anthropic API ({})", api.endpoint()) write!(f, "Anthropic AI ({})", api.endpoint())
}
}
}
}
impl fmt::Display for SupportedUpstreamAPIs {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SupportedUpstreamAPIs::OpenAIChatCompletions(api) => {
write!(f, "OpenAI ({})", api.endpoint())
}
SupportedUpstreamAPIs::AnthropicMessagesAPI(api) => {
write!(f, "Anthropic ({})", api.endpoint())
}
SupportedUpstreamAPIs::AmazonBedrockConverse(api) => {
write!(f, "Amazon Bedrock ({})", api.endpoint())
}
SupportedUpstreamAPIs::AmazonBedrockConverseStream(api) => {
write!(f, "Amazon Bedrock ({})", api.endpoint())
} }
} }
} }

View file

@ -380,6 +380,16 @@ impl TryFrom<(SseEvent, &SupportedAPIs, &SupportedUpstreamAPIs)> for SseEvent {
transformed_event.sse_transform_buffer = format!("\n"); // suppress the event upstream for OpenAI transformed_event.sse_transform_buffer = format!("\n"); // suppress the event upstream for OpenAI
} }
} }
(
SupportedAPIs::AnthropicMessagesAPI(_),
SupportedUpstreamAPIs::AnthropicMessagesAPI(_),
) => {
// When both client and upstream are Anthropic, suppress event-only lines
// because the data line transformation already includes the event line
if transformed_event.is_event_only() && transformed_event.event.is_some() {
transformed_event.sse_transform_buffer = String::new(); // suppress duplicate event line
}
}
_ => { _ => {
// Other combinations can be handled here as needed // Other combinations can be handled here as needed
} }

View file

@ -97,21 +97,24 @@ impl TryFrom<AnthropicMessagesRequest> for ConverseRequest {
}); });
// Convert tools and tool choice to ToolConfiguration // Convert tools and tool choice to ToolConfiguration
let tool_config = if req.tools.is_some() || req.tool_choice.is_some() { // Only include toolConfig if we have actual tools (Bedrock requires at least 1 tool)
let tools = req.tools.map(|anthropic_tools| { let tool_config = req.tools.and_then(|anthropic_tools| {
anthropic_tools if anthropic_tools.is_empty() {
.into_iter() return None;
.map(|tool| BedrockTool::ToolSpec { }
tool_spec: ToolSpecDefinition {
name: tool.name, let tools = anthropic_tools
description: tool.description, .into_iter()
input_schema: ToolInputSchema { .map(|tool| BedrockTool::ToolSpec {
json: tool.input_schema, tool_spec: ToolSpecDefinition {
}, name: tool.name,
description: tool.description,
input_schema: ToolInputSchema {
json: tool.input_schema,
}, },
}) },
.collect() })
}); .collect();
let tool_choice = req.tool_choice.map(|choice| { let tool_choice = req.tool_choice.map(|choice| {
match choice.kind { match choice.kind {
@ -136,10 +139,11 @@ impl TryFrom<AnthropicMessagesRequest> for ConverseRequest {
} }
}); });
Some(ToolConfiguration { tools, tool_choice }) Some(ToolConfiguration {
} else { tools: Some(tools),
None tool_choice,
}; })
});
Ok(ConverseRequest { Ok(ConverseRequest {
model_id: req.model, model_id: req.model,

View file

@ -54,6 +54,8 @@ pub struct StreamContext {
user_message: Option<String>, user_message: Option<String>,
upstream_status_code: Option<StatusCode>, upstream_status_code: Option<StatusCode>,
binary_frame_decoder: Option<BedrockBinaryFrameDecoder<bytes::BytesMut>>, binary_frame_decoder: Option<BedrockBinaryFrameDecoder<bytes::BytesMut>>,
http_method: Option<String>,
http_protocol: Option<String>,
} }
impl StreamContext { impl StreamContext {
@ -83,6 +85,8 @@ impl StreamContext {
user_message: None, user_message: None,
upstream_status_code: None, upstream_status_code: None,
binary_frame_decoder: None, binary_frame_decoder: None,
http_method: None,
http_protocol: None,
} }
} }
@ -282,7 +286,7 @@ impl StreamContext {
} }
} }
} }
fn handle_end_of_stream_metrics_and_traces(&mut self, current_time: SystemTime) { fn handle_end_of_request_metrics_and_traces(&mut self, current_time: SystemTime) {
// All streaming responses end with bytes=0 and end_stream=true // All streaming responses end with bytes=0 and end_stream=true
// Record the latency for the request // Record the latency for the request
match current_time.duration_since(self.start_time) { match current_time.duration_since(self.start_time) {
@ -332,9 +336,18 @@ impl StreamContext {
warn!("traceparent header is invalid: {}", e); warn!("traceparent header is invalid: {}", e);
} }
Ok(traceparent) => { Ok(traceparent) => {
let mut trace_data = common::tracing::TraceData::new(); let service_name = match &self.resolved_api {
Some(api) => {
let api_display = api.to_string();
format!("archgw.{}", api_display)
}
None => "archgw".to_string(),
};
let mut trace_data =
common::tracing::TraceData::new_with_service_name(service_name);
let mut llm_span = Span::new( let mut llm_span = Span::new(
"egress_traffic".to_string(), self.llm_provider().name.to_string(),
Some(traceparent.trace_id), Some(traceparent.trace_id),
Some(traceparent.parent_id), Some(traceparent.parent_id),
self.request_body_sent_time.unwrap(), self.request_body_sent_time.unwrap(),
@ -344,17 +357,34 @@ impl StreamContext {
.add_attribute("model".to_string(), self.llm_provider().name.to_string()); .add_attribute("model".to_string(), self.llm_provider().name.to_string());
if let Some(user_message) = &self.user_message { if let Some(user_message) = &self.user_message {
llm_span.add_attribute("user_message".to_string(), user_message.clone()); llm_span.add_attribute("message".to_string(), user_message.clone());
} }
// Add HTTP attributes
if let Some(method) = &self.http_method {
llm_span.add_attribute("http.method".to_string(), method.clone());
}
if let Some(protocol) = &self.http_protocol {
llm_span.add_attribute("http.protocol".to_string(), protocol.clone());
}
if let Some(status_code) = &self.upstream_status_code {
llm_span.add_attribute(
"http.status_code".to_string(),
status_code.as_u16().to_string(),
);
}
// Add request ID attribute
llm_span
.add_attribute("http.request_id".to_string(), self.request_identifier());
if self.ttft_time.is_some() { if self.ttft_time.is_some() {
llm_span.add_event(Event::new( llm_span.add_event(Event::new(
"time_to_first_token".to_string(), "time_to_first_token".to_string(),
self.ttft_time.unwrap(), self.ttft_time.unwrap(),
)); ));
trace_data.add_span(llm_span);
} }
trace_data.add_span(llm_span);
self.traces_queue.lock().unwrap().push_back(trace_data); self.traces_queue.lock().unwrap().push_back(trace_data);
} }
}; };
@ -460,7 +490,7 @@ impl StreamContext {
}; };
// Extract ProviderStreamResponse for processing (token counting, etc.) // Extract ProviderStreamResponse for processing (token counting, etc.)
if !transformed_event.is_done() { if !transformed_event.is_done() && !transformed_event.is_event_only() {
match transformed_event.provider_response() { match transformed_event.provider_response() {
Ok(provider_response) => { Ok(provider_response) => {
self.record_ttft_if_needed(); self.record_ttft_if_needed();
@ -722,6 +752,10 @@ impl HttpContext for StreamContext {
return Action::Continue; return Action::Continue;
} }
// Capture HTTP method and protocol for tracing
self.http_method = self.get_http_request_header(":method");
self.http_protocol = self.get_http_request_header(":scheme");
self.streaming_response = self self.streaming_response = self
.get_http_request_header(ARCH_IS_STREAMING_HEADER) .get_http_request_header(ARCH_IS_STREAMING_HEADER)
.map(|val| val == "true") .map(|val| val == "true")
@ -1058,6 +1092,17 @@ impl HttpContext for StreamContext {
return Action::Continue; return Action::Continue;
} }
let current_time = get_current_time().unwrap();
if end_of_stream && body_size == 0 {
debug!(
"[ARCHGW_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}",
self.request_identifier(),
body_size
);
self.handle_end_of_request_metrics_and_traces(current_time);
return Action::Continue;
}
// Check if this is an error response from upstream // Check if this is an error response from upstream
if let Some(status_code) = &self.upstream_status_code { if let Some(status_code) = &self.upstream_status_code {
if status_code.is_client_error() || status_code.is_server_error() { if status_code.is_client_error() || status_code.is_server_error() {
@ -1101,12 +1146,6 @@ impl HttpContext for StreamContext {
} }
} }
let current_time = get_current_time().unwrap();
if end_of_stream && body_size == 0 {
self.handle_end_of_stream_metrics_and_traces(current_time);
return Action::Continue;
}
let body = match self.read_raw_response_body(body_size) { let body = match self.read_raw_response_body(body_size) {
Ok(bytes) => bytes, Ok(bytes) => bytes,
Err(action) => return action, Err(action) => return action,
@ -1135,6 +1174,7 @@ impl HttpContext for StreamContext {
Err(action) => return action, Err(action) => return action,
} }
} }
Action::Continue Action::Continue
} }
} }

View file

@ -20,13 +20,12 @@ llm_providers:
routing_preferences: routing_preferences:
- name: code understanding - name: code understanding
description: understand and explain existing code snippets, functions, or libraries description: understand and explain existing code snippets, functions, or libraries
# Anthropic Models # Anthropic Models
- model: anthropic/claude-sonnet-4-5 - model: anthropic/claude-sonnet-4-5
default: true default: true
access_key: $ANTHROPIC_API_KEY access_key: $ANTHROPIC_API_KEY
- model: anthropic/claude-3-haiku-20240307 - model: anthropic/claude-haiku-4-5
access_key: $ANTHROPIC_API_KEY access_key: $ANTHROPIC_API_KEY
# Ollama Models # Ollama Models
@ -38,4 +37,7 @@ llm_providers:
model_aliases: model_aliases:
# Alias for a small faster Claude model # Alias for a small faster Claude model
arch.claude.code.small.fast: arch.claude.code.small.fast:
target: claude-3-haiku-20240307 target: claude-haiku-4-5
tracing:
random_sampling: 100

View file

@ -78,3 +78,6 @@ model_aliases:
coding-model: coding-model:
target: us.amazon.nova-premier-v1:0 target: us.amazon.nova-premier-v1:0
tracing:
random_sampling: 100