diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index 1e60bfd5..ae9d0fbc 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -51,7 +51,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: arch_gateway + service_name: archgw(inbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} @@ -416,7 +416,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: arch_gateway + service_name: archgw(outbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} @@ -497,21 +497,6 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: egress_traffic_llm - random_sampling: - value: {{ arch_tracing.random_sampling }} - {% endif %} stat_prefix: egress_traffic codec_type: AUTO scheme_header_transformation: diff --git a/crates/common/src/tracing.rs b/crates/common/src/tracing.rs index 363a0870..60ccca15 100644 --- a/crates/common/src/tracing.rs +++ b/crates/common/src/tracing.rs @@ -160,16 +160,41 @@ impl TraceData { } } + pub fn new_with_service_name(service_name: String) -> Self { + let mut resource_attributes = Vec::new(); + resource_attributes.push(Attribute { + key: "service.name".to_string(), + value: AttributeValue { + string_value: Some(service_name), + }, + }); + + let resource = Resource { + attributes: resource_attributes, + }; + + let scope_span = ScopeSpan { + scope: Scope { + name: "default".to_string(), + version: "1.0".to_string(), + attributes: Vec::new(), + }, + spans: Vec::new(), + }; + + let resource_span = ResourceSpan { + resource, + scope_spans: vec![scope_span], + }; + + TraceData { + resource_spans: vec![resource_span], + } + } + pub fn add_span(&mut self, span: Span) { if self.resource_spans.is_empty() { - let resource = Resource { - attributes: vec![Attribute { - key: "service.name".to_string(), - value: AttributeValue { - string_value: Some("egress_llm_traffic".to_string()), - }, - }], - }; + let resource = Resource { attributes: Vec::new() }; let scope_span = ScopeSpan { scope: Scope { name: "default".to_string(), diff --git a/crates/hermesllm/src/clients/endpoints.rs b/crates/hermesllm/src/clients/endpoints.rs index 4e23c942..5177ce97 100644 --- a/crates/hermesllm/src/clients/endpoints.rs +++ b/crates/hermesllm/src/clients/endpoints.rs @@ -21,10 +21,29 @@ impl fmt::Display for SupportedAPIs { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { SupportedAPIs::OpenAIChatCompletions(api) => { - write!(f, "OpenAI API ({})", api.endpoint()) + write!(f, "OpenAI ({})", api.endpoint()) } SupportedAPIs::AnthropicMessagesAPI(api) => { - write!(f, "Anthropic API ({})", api.endpoint()) + write!(f, "Anthropic AI ({})", api.endpoint()) + } + } + } +} + +impl fmt::Display for SupportedUpstreamAPIs { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SupportedUpstreamAPIs::OpenAIChatCompletions(api) => { + write!(f, "OpenAI ({})", api.endpoint()) + } + SupportedUpstreamAPIs::AnthropicMessagesAPI(api) => { + write!(f, "Anthropic ({})", api.endpoint()) + } + SupportedUpstreamAPIs::AmazonBedrockConverse(api) => { + write!(f, "Amazon Bedrock ({})", api.endpoint()) + } + SupportedUpstreamAPIs::AmazonBedrockConverseStream(api) => { + write!(f, "Amazon Bedrock ({})", api.endpoint()) } } } diff --git a/crates/hermesllm/src/providers/response.rs b/crates/hermesllm/src/providers/response.rs index c6c37693..f09b2c04 100644 --- a/crates/hermesllm/src/providers/response.rs +++ b/crates/hermesllm/src/providers/response.rs @@ -380,6 +380,16 @@ impl TryFrom<(SseEvent, &SupportedAPIs, &SupportedUpstreamAPIs)> for SseEvent { transformed_event.sse_transform_buffer = format!("\n"); // suppress the event upstream for OpenAI } } + ( + SupportedAPIs::AnthropicMessagesAPI(_), + SupportedUpstreamAPIs::AnthropicMessagesAPI(_), + ) => { + // When both client and upstream are Anthropic, suppress event-only lines + // because the data line transformation already includes the event line + if transformed_event.is_event_only() && transformed_event.event.is_some() { + transformed_event.sse_transform_buffer = String::new(); // suppress duplicate event line + } + } _ => { // Other combinations can be handled here as needed } diff --git a/crates/hermesllm/src/transforms/request/from_anthropic.rs b/crates/hermesllm/src/transforms/request/from_anthropic.rs index 877faaa8..9dedc313 100644 --- a/crates/hermesllm/src/transforms/request/from_anthropic.rs +++ b/crates/hermesllm/src/transforms/request/from_anthropic.rs @@ -97,21 +97,24 @@ impl TryFrom for ConverseRequest { }); // Convert tools and tool choice to ToolConfiguration - let tool_config = if req.tools.is_some() || req.tool_choice.is_some() { - let tools = req.tools.map(|anthropic_tools| { - anthropic_tools - .into_iter() - .map(|tool| BedrockTool::ToolSpec { - tool_spec: ToolSpecDefinition { - name: tool.name, - description: tool.description, - input_schema: ToolInputSchema { - json: tool.input_schema, - }, + // Only include toolConfig if we have actual tools (Bedrock requires at least 1 tool) + let tool_config = req.tools.and_then(|anthropic_tools| { + if anthropic_tools.is_empty() { + return None; + } + + let tools = anthropic_tools + .into_iter() + .map(|tool| BedrockTool::ToolSpec { + tool_spec: ToolSpecDefinition { + name: tool.name, + description: tool.description, + input_schema: ToolInputSchema { + json: tool.input_schema, }, - }) - .collect() - }); + }, + }) + .collect(); let tool_choice = req.tool_choice.map(|choice| { match choice.kind { @@ -136,10 +139,11 @@ impl TryFrom for ConverseRequest { } }); - Some(ToolConfiguration { tools, tool_choice }) - } else { - None - }; + Some(ToolConfiguration { + tools: Some(tools), + tool_choice, + }) + }); Ok(ConverseRequest { model_id: req.model, diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 785b5b72..870530ab 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -54,6 +54,8 @@ pub struct StreamContext { user_message: Option, upstream_status_code: Option, binary_frame_decoder: Option>, + http_method: Option, + http_protocol: Option, } impl StreamContext { @@ -83,6 +85,8 @@ impl StreamContext { user_message: None, upstream_status_code: None, binary_frame_decoder: None, + http_method: None, + http_protocol: None, } } @@ -282,7 +286,7 @@ impl StreamContext { } } } - fn handle_end_of_stream_metrics_and_traces(&mut self, current_time: SystemTime) { + fn handle_end_of_request_metrics_and_traces(&mut self, current_time: SystemTime) { // All streaming responses end with bytes=0 and end_stream=true // Record the latency for the request match current_time.duration_since(self.start_time) { @@ -332,9 +336,18 @@ impl StreamContext { warn!("traceparent header is invalid: {}", e); } Ok(traceparent) => { - let mut trace_data = common::tracing::TraceData::new(); + let service_name = match &self.resolved_api { + Some(api) => { + let api_display = api.to_string(); + format!("archgw.{}", api_display) + } + None => "archgw".to_string(), + }; + + let mut trace_data = + common::tracing::TraceData::new_with_service_name(service_name); let mut llm_span = Span::new( - "egress_traffic".to_string(), + self.llm_provider().name.to_string(), Some(traceparent.trace_id), Some(traceparent.parent_id), self.request_body_sent_time.unwrap(), @@ -344,17 +357,34 @@ impl StreamContext { .add_attribute("model".to_string(), self.llm_provider().name.to_string()); if let Some(user_message) = &self.user_message { - llm_span.add_attribute("user_message".to_string(), user_message.clone()); + llm_span.add_attribute("message".to_string(), user_message.clone()); } + // Add HTTP attributes + if let Some(method) = &self.http_method { + llm_span.add_attribute("http.method".to_string(), method.clone()); + } + if let Some(protocol) = &self.http_protocol { + llm_span.add_attribute("http.protocol".to_string(), protocol.clone()); + } + if let Some(status_code) = &self.upstream_status_code { + llm_span.add_attribute( + "http.status_code".to_string(), + status_code.as_u16().to_string(), + ); + } + + // Add request ID attribute + llm_span + .add_attribute("http.request_id".to_string(), self.request_identifier()); + if self.ttft_time.is_some() { llm_span.add_event(Event::new( "time_to_first_token".to_string(), self.ttft_time.unwrap(), )); - trace_data.add_span(llm_span); } - + trace_data.add_span(llm_span); self.traces_queue.lock().unwrap().push_back(trace_data); } }; @@ -460,7 +490,7 @@ impl StreamContext { }; // Extract ProviderStreamResponse for processing (token counting, etc.) - if !transformed_event.is_done() { + if !transformed_event.is_done() && !transformed_event.is_event_only() { match transformed_event.provider_response() { Ok(provider_response) => { self.record_ttft_if_needed(); @@ -722,6 +752,10 @@ impl HttpContext for StreamContext { return Action::Continue; } + // Capture HTTP method and protocol for tracing + self.http_method = self.get_http_request_header(":method"); + self.http_protocol = self.get_http_request_header(":scheme"); + self.streaming_response = self .get_http_request_header(ARCH_IS_STREAMING_HEADER) .map(|val| val == "true") @@ -1058,6 +1092,17 @@ impl HttpContext for StreamContext { return Action::Continue; } + let current_time = get_current_time().unwrap(); + if end_of_stream && body_size == 0 { + debug!( + "[ARCHGW_REQ_ID:{}] RESPONSE_BODY_COMPLETE: total_bytes={}", + self.request_identifier(), + body_size + ); + self.handle_end_of_request_metrics_and_traces(current_time); + return Action::Continue; + } + // Check if this is an error response from upstream if let Some(status_code) = &self.upstream_status_code { if status_code.is_client_error() || status_code.is_server_error() { @@ -1101,12 +1146,6 @@ impl HttpContext for StreamContext { } } - let current_time = get_current_time().unwrap(); - if end_of_stream && body_size == 0 { - self.handle_end_of_stream_metrics_and_traces(current_time); - return Action::Continue; - } - let body = match self.read_raw_response_body(body_size) { Ok(bytes) => bytes, Err(action) => return action, @@ -1135,6 +1174,7 @@ impl HttpContext for StreamContext { Err(action) => return action, } } + Action::Continue } } diff --git a/demos/use_cases/claude_code_router/config.yaml b/demos/use_cases/claude_code_router/config.yaml index 11a98c07..e41db0c0 100644 --- a/demos/use_cases/claude_code_router/config.yaml +++ b/demos/use_cases/claude_code_router/config.yaml @@ -20,13 +20,12 @@ llm_providers: routing_preferences: - name: code understanding description: understand and explain existing code snippets, functions, or libraries - # Anthropic Models - model: anthropic/claude-sonnet-4-5 default: true access_key: $ANTHROPIC_API_KEY - - model: anthropic/claude-3-haiku-20240307 + - model: anthropic/claude-haiku-4-5 access_key: $ANTHROPIC_API_KEY # Ollama Models @@ -38,4 +37,7 @@ llm_providers: model_aliases: # Alias for a small faster Claude model arch.claude.code.small.fast: - target: claude-3-haiku-20240307 + target: claude-haiku-4-5 + +tracing: + random_sampling: 100 diff --git a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml index 0a626be9..2a7e47c7 100644 --- a/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml +++ b/demos/use_cases/model_alias_routing/arch_config_with_aliases.yaml @@ -78,3 +78,6 @@ model_aliases: coding-model: target: us.amazon.nova-premier-v1:0 + +tracing: + random_sampling: 100