diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index b1c4f487..e9b60756 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -55,7 +55,7 @@ static_resources: random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} - stat_prefix: ingress_traffic + stat_prefix: plano(inbound) codec_type: AUTO scheme_header_transformation: scheme_to_overwrite: https @@ -95,21 +95,6 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: ingress_traffic - random_sampling: - value: {{ arch_tracing.random_sampling }} - {% endif %} stat_prefix: ingress_traffic codec_type: AUTO scheme_header_transformation: @@ -240,7 +225,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: outbound_api_traffic + service_name: tool random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} @@ -413,7 +398,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: archgw(outbound) + service_name: plano(outbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} diff --git a/crates/Cargo.lock b/crates/Cargo.lock index b243c365..7aa85f59 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -379,12 +379,15 @@ dependencies = [ "pretty_assertions", "proxy-wasm", "rand 0.8.5", + "reqwest", "serde", "serde_json", "serde_with", "serde_yaml", "thiserror 1.0.69", "tiktoken-rs", + "tokio", + "tracing", "url", "urlencoding", ] diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 3dfd1abe..2d88e213 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" async-openai = "0.30.1" bytes = "1.10.1" chrono = "0.4" -common = { version = "0.1.0", path = "../common" } +common = { version = "0.1.0", path = "../common", features = ["trace-collection"] } eventsource-client = "0.15.0" eventsource-stream = "0.2.3" futures = "0.3.31" diff --git a/crates/brightstaff/src/handlers/mod.rs b/crates/brightstaff/src/handlers/mod.rs index 2583b41e..177ec8a1 100644 --- a/crates/brightstaff/src/handlers/mod.rs +++ b/crates/brightstaff/src/handlers/mod.rs @@ -1,6 +1,7 @@ pub mod agent_chat_completions; pub mod agent_selector; pub mod router; +pub mod router_chat; pub mod models; pub mod function_calling; pub mod pipeline_processor; diff --git a/crates/brightstaff/src/handlers/router.rs b/crates/brightstaff/src/handlers/router.rs index c369729a..a8398c1a 100644 --- a/crates/brightstaff/src/handlers/router.rs +++ b/crates/brightstaff/src/handlers/router.rs @@ -1,8 +1,7 @@ use bytes::Bytes; -use common::configuration::{ModelAlias, ModelUsagePreference}; +use common::configuration::{LlmProvider, ModelAlias}; use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; -use hermesllm::apis::openai::ChatCompletionsRequest; -use hermesllm::clients::endpoints::SupportedUpstreamAPIs; +use common::traces::TraceCollector; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::{ProviderRequest, ProviderRequestType}; use http_body_util::combinators::BoxBody; @@ -11,10 +10,13 @@ use hyper::header::{self}; use hyper::{Request, Response, StatusCode}; use std::collections::HashMap; use std::sync::Arc; -use tracing::{debug, info, warn}; +use tokio::sync::RwLock; +use tracing::{debug, warn}; use crate::router::llm_router::RouterService; -use crate::handlers::utils::{create_streaming_response, PassthroughProcessor}; +use crate::handlers::utils::{create_streaming_response, PassthroughProcessor, truncate_message}; +use crate::handlers::router_chat::router_chat_get_upstream_model; +use crate::tracing::operation_component; fn full>(chunk: T) -> BoxBody { Full::new(chunk.into()) @@ -22,14 +24,26 @@ fn full>(chunk: T) -> BoxBody { .boxed() } -pub async fn router_chat( +pub async fn chat( request: Request, router_service: Arc, full_qualified_llm_provider_url: String, model_aliases: Arc>>, + llm_providers: Arc>>, + trace_collector: Arc, ) -> Result>, hyper::Error> { + let request_path = request.uri().path().to_string(); - let mut request_headers = request.headers().clone(); + let request_headers = request.headers().clone(); + + // Extract traceparent header early (Envoy should have added this) + let traceparent = request_headers + .get("traceparent") + .and_then(|h| h.to_str().ok()) + .unwrap_or("00-00000000000000000000000000000000-0000000000000000-01") + .to_string(); + + let mut request_headers = request_headers; let chat_request_bytes = request.collect().await?.to_bytes(); debug!( @@ -55,142 +69,52 @@ pub async fn router_chat( // This ensures all downstream objects use the resolved model let model_from_request = client_request.model().to_string(); let is_streaming_request = client_request.is_streaming(); - let resolved_model = if let Some(model_aliases) = model_aliases.as_ref() { - if let Some(model_alias) = model_aliases.get(&model_from_request) { - debug!( - "Model Alias: 'From {}' -> 'To {}'", - model_from_request, model_alias.target - ); - model_alias.target.clone() - } else { - model_from_request.clone() - } - } else { - model_from_request.clone() - }; + let resolved_model = resolve_model_alias(&model_from_request, &model_aliases); + + // Extract tool names and user message preview for span attributes + let tool_names = client_request.get_tool_names(); + let user_message_preview = client_request.get_recent_user_message() + .map(|msg| truncate_message(&msg, 50)); + client_request.set_model(resolved_model.clone()); - - // Clone metadata for routing and remove archgw_preference_config from original - let routing_metadata = client_request.metadata().clone(); - if client_request.remove_metadata_key("archgw_preference_config") { debug!("Removed archgw_preference_config from metadata"); } let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap(); - // Convert to ChatCompletionsRequest regardless of input type (clone to avoid moving original) - let chat_completions_request_for_arch_router: ChatCompletionsRequest = - match ProviderRequestType::try_from(( - client_request, - &SupportedUpstreamAPIs::OpenAIChatCompletions( - hermesllm::apis::OpenAIApi::ChatCompletions, - ), - )) { - Ok(ProviderRequestType::ChatCompletionsRequest(req)) => req, - Ok( - ProviderRequestType::MessagesRequest(_) - | ProviderRequestType::BedrockConverse(_) - | ProviderRequestType::BedrockConverseStream(_) - | ProviderRequestType::ResponsesAPIRequest(_), - ) => { - // This should not happen after conversion to OpenAI format - warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); - let err_msg = "Request conversion failed".to_string(); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - Err(err) => { - warn!( - "Failed to convert request to ChatCompletionsRequest: {}", - err - ); - let err_msg = format!("Failed to convert request: {}", err); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - }; - - debug!( - "[ARCH_ROUTER REQ]: {}", - &serde_json::to_string(&chat_completions_request_for_arch_router).unwrap() - ); - - let trace_parent = request_headers - .iter() - .find(|(ty, _)| ty.as_str() == "traceparent") - .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - - let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { - metadata - .get("archgw_preference_config") - .map(|value| value.to_string()) - }); - - let usage_preferences: Option> = usage_preferences_str - .as_ref() - .and_then(|s| serde_yaml::from_str(s).ok()); - - let latest_message_for_log = chat_completions_request_for_arch_router - .messages - .last() - .map_or("None".to_string(), |msg| { - msg.content.to_string().replace('\n', "\\n") - }); - - const MAX_MESSAGE_LENGTH: usize = 50; - let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH { - let truncated: String = latest_message_for_log - .chars() - .take(MAX_MESSAGE_LENGTH) - .collect(); - format!("{}...", truncated) - } else { - latest_message_for_log - }; - - info!( - "request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}", - usage_preferences.is_some(), - request_path, - latest_message_for_log - ); - - debug!("usage preferences from request: {:?}", usage_preferences); - - let model_name = match router_service - .determine_route( - &chat_completions_request_for_arch_router.messages, - trace_parent.clone(), - usage_preferences, - ) - .await + // Determine routing using the dedicated router_chat module + let routing_result = match router_chat_get_upstream_model( + router_service, + client_request, // Pass the original request - router_chat will convert it + &request_headers, + trace_collector.clone(), + &traceparent, + &request_path, + ) + .await { - Ok(route) => match route { - Some((_, model_name)) => model_name, - None => { - info!( - "No route determined, using default model from request: {}", - chat_completions_request_for_arch_router.model - ); - chat_completions_request_for_arch_router.model.clone() - } - }, + Ok(result) => result, Err(err) => { - let err_msg = format!("Failed to determine route: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + let mut internal_error = Response::new(full(err.message)); + *internal_error.status_mut() = err.status_code; return Ok(internal_error); } }; + let model_name = routing_result.model_name; + debug!( "[ARCH_ROUTER] URL: {}, Resolved Model: {}", full_qualified_llm_provider_url, model_name ); + // Extract trace_parent for upstream request headers + let trace_parent = request_headers + .iter() + .find(|(ty, _)| ty.as_str() == "traceparent") + .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + request_headers.insert( ARCH_PROVIDER_HINT_HEADER, header::HeaderValue::from_str(&model_name).unwrap(), @@ -210,6 +134,10 @@ pub async fn router_chat( // remove content-length header if it exists request_headers.remove(header::CONTENT_LENGTH); + // Capture start time right before sending request to upstream + let request_start_time = std::time::Instant::now(); + let request_start_system_time = std::time::SystemTime::now(); + let llm_response = match reqwest::Client::new() .post(full_qualified_llm_provider_url) .headers(request_headers) @@ -235,9 +163,31 @@ pub async fn router_chat( headers.insert(header_name, header_value.clone()); } - // Use the streaming utility with a passthrough processor (no modification of chunks) + // Build LLM span with actual status code using constants let byte_stream = llm_response.bytes_stream(); - let processor = PassthroughProcessor; + + // Build the LLM span (will be finalized after streaming completes) + let llm_span = build_llm_span( + &traceparent, + &request_path, + &resolved_model, + &model_name, + upstream_status.as_u16(), + is_streaming_request, + request_start_system_time, + tool_names, + user_message_preview, + &llm_providers, + ).await; + + // Use PassthroughProcessor to track streaming metrics and finalize the span + let processor = PassthroughProcessor::new( + trace_collector, + operation_component::LLM, + llm_span, + request_start_time, + ); + let streaming_response = create_streaming_response(byte_stream, processor, 16); match response.body(streaming_response.body) { @@ -250,3 +200,140 @@ pub async fn router_chat( } } } + +/// Resolves model aliases by looking up the requested model in the model_aliases map. +/// Returns the target model if an alias is found, otherwise returns the original model. +fn resolve_model_alias( + model_from_request: &str, + model_aliases: &Arc>>, +) -> String { + if let Some(aliases) = model_aliases.as_ref() { + if let Some(model_alias) = aliases.get(model_from_request) { + debug!( + "Model Alias: 'From {}' -> 'To {}'", + model_from_request, model_alias.target + ); + return model_alias.target.clone(); + } + } + model_from_request.to_string() +} + +/// Builds the LLM span with all required and optional attributes. +async fn build_llm_span( + traceparent: &str, + request_path: &str, + resolved_model: &str, + model_name: &str, + status_code: u16, + is_streaming: bool, + start_time: std::time::SystemTime, + tool_names: Option>, + user_message_preview: Option, + llm_providers: &Arc>>, +) -> common::traces::Span { + use common::traces::{SpanBuilder, SpanKind, parse_traceparent}; + use crate::tracing::{http, llm, OperationNameBuilder}; + + // Calculate the upstream path based on provider configuration + let upstream_path = get_upstream_path( + llm_providers, + model_name, + request_path, + resolved_model, + is_streaming, + ).await; + + // Build operation name showing path transformation if different + let operation_name = if request_path != upstream_path { + OperationNameBuilder::new() + .with_method("POST") + .with_path(&format!("{} >> {}", request_path, upstream_path)) + .with_target(resolved_model) + .build() + } else { + OperationNameBuilder::new() + .with_method("POST") + .with_path(request_path) + .with_target(resolved_model) + .build() + }; + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + let mut span_builder = SpanBuilder::new(&operation_name) + .with_trace_id(&trace_id) + .with_parent_span_id(&parent_span_id) + .with_kind(SpanKind::Client) + .with_start_time(start_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::STATUS_CODE, status_code.to_string()) + .with_attribute(http::TARGET, request_path.to_string()) + .with_attribute(http::UPSTREAM_TARGET, upstream_path) + .with_attribute(llm::MODEL_NAME, resolved_model.to_string()) + .with_attribute(llm::IS_STREAMING, is_streaming.to_string()); + + // Add optional attributes + if let Some(tools) = tool_names { + span_builder = span_builder.with_attribute(llm::TOOLS, tools.join("\n")); + } + + if let Some(preview) = user_message_preview { + span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); + } + + span_builder.build() +} + +/// Calculates the upstream path for the provider based on the model name. +/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix, +/// then uses target_endpoint_for_provider to calculate the correct upstream path. +async fn get_upstream_path( + llm_providers: &Arc>>, + model_name: &str, + request_path: &str, + resolved_model: &str, + is_streaming: bool, +) -> String { + let providers_lock = llm_providers.read().await; + + // First, try to find by model name or provider name + let provider = providers_lock.iter().find(|p| { + p.model.as_ref().map(|m| m == model_name).unwrap_or(false) + || p.name == model_name + }); + + let (provider_id, base_url_path_prefix) = if let Some(provider) = provider { + let provider_id = provider.provider_interface.to_provider_id(); + let prefix = provider.base_url_path_prefix.clone(); + (provider_id, prefix) + } else { + let default_provider = providers_lock.iter().find(|p| { + p.default.unwrap_or(false) + }); + + if let Some(provider) = default_provider { + let provider_id = provider.provider_interface.to_provider_id(); + let prefix = provider.base_url_path_prefix.clone(); + (provider_id, prefix) + } else { + // Last resort: use OpenAI as hardcoded fallback + warn!("No default provider found, falling back to OpenAI"); + (hermesllm::ProviderId::OpenAI, None) + } + }; + + drop(providers_lock); + + // Calculate the upstream path using the proper API + let client_api = SupportedAPIsFromClient::from_endpoint(request_path) + .expect("Should have valid API endpoint"); + + client_api.target_endpoint_for_provider( + &provider_id, + request_path, + resolved_model, + is_streaming, + base_url_path_prefix.as_deref(), + ) +} diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs new file mode 100644 index 00000000..03c660de --- /dev/null +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -0,0 +1,239 @@ +use common::configuration::ModelUsagePreference; +use common::traces::{TraceCollector, SpanKind, SpanBuilder, parse_traceparent}; +use hermesllm::clients::endpoints::SupportedUpstreamAPIs; +use hermesllm::{ProviderRequest, ProviderRequestType}; +use hyper::StatusCode; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, info, warn}; + +use crate::router::llm_router::RouterService; +use crate::tracing::{OperationNameBuilder, operation_component, http, routing}; + +pub struct RoutingResult { + pub model_name: String +} + +pub struct RoutingError { + pub message: String, + pub status_code: StatusCode, +} + +impl RoutingError { + pub fn internal_error(message: String) -> Self { + Self { + message, + status_code: StatusCode::INTERNAL_SERVER_ERROR + } + } +} + +/// Determines the routing decision if +/// +/// # Returns +/// * `Ok(RoutingResult)` - Contains the selected model name and span ID +/// * `Err(RoutingError)` - Contains error details and optional span ID +pub async fn router_chat_get_upstream_model( + router_service: Arc, + client_request: ProviderRequestType, + request_headers: &hyper::HeaderMap, + trace_collector: Arc, + traceparent: &str, + request_path: &str, +) -> Result { + // Clone metadata for routing before converting (which consumes client_request) + let routing_metadata = client_request.metadata().clone(); + + // Convert to ChatCompletionsRequest for routing (regardless of input type) + let chat_request = match ProviderRequestType::try_from(( + client_request, + &SupportedUpstreamAPIs::OpenAIChatCompletions( + hermesllm::apis::OpenAIApi::ChatCompletions, + ), + )) { + Ok(ProviderRequestType::ChatCompletionsRequest(req)) => req, + Ok( + ProviderRequestType::MessagesRequest(_) + | ProviderRequestType::BedrockConverse(_) + | ProviderRequestType::BedrockConverseStream(_) + | ProviderRequestType::ResponsesAPIRequest(_), + ) => { + warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); + return Err(RoutingError::internal_error( + "Request conversion failed".to_string(), + )); + } + Err(err) => { + warn!("Failed to convert request to ChatCompletionsRequest: {}", err); + return Err(RoutingError::internal_error(format!( + "Failed to convert request: {}", + err + ))); + } + }; + + debug!( + "[ARCH_ROUTER REQ]: {}", + &serde_json::to_string(&chat_request).unwrap() + ); + + // Extract trace_parent from headers + let trace_parent = request_headers + .iter() + .find(|(ty, _)| ty.as_str() == "traceparent") + .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + + // Extract usage preferences from metadata + let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { + metadata + .get("archgw_preference_config") + .map(|value| value.to_string()) + }); + + let usage_preferences: Option> = usage_preferences_str + .as_ref() + .and_then(|s| serde_yaml::from_str(s).ok()); + + // Prepare log message with latest message from chat request + let latest_message_for_log = chat_request + .messages + .last() + .map_or("None".to_string(), |msg| { + msg.content.to_string().replace('\n', "\\n") + }); + + const MAX_MESSAGE_LENGTH: usize = 50; + let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH { + let truncated: String = latest_message_for_log + .chars() + .take(MAX_MESSAGE_LENGTH) + .collect(); + format!("{}...", truncated) + } else { + latest_message_for_log + }; + + info!( + "request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}", + usage_preferences.is_some(), + request_path, + latest_message_for_log + ); + + debug!("usage preferences from request: {:?}", usage_preferences); + + // Capture start time for routing span + let routing_start_time = std::time::Instant::now(); + let routing_start_system_time = std::time::SystemTime::now(); + + // Attempt to determine route using the router service + let routing_result = router_service + .determine_route(&chat_request.messages, trace_parent, usage_preferences) + .await; + + match routing_result { + Ok(route) => match route { + Some((_, model_name)) => { + // Record successful routing span + let mut attrs: HashMap = HashMap::new(); + attrs.insert("route.selected_model".to_string(), model_name.clone()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Ok(RoutingResult { + model_name + }) + } + None => { + // No route determined, use default model from request + info!( + "No route determined, using default model from request: {}", + chat_request.model + ); + + let default_model = chat_request.model.clone(); + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), default_model.clone()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Ok(RoutingResult { + model_name: default_model + }) + } + }, + Err(err) => { + // Record failed routing span + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), "unknown".to_string()); + attrs.insert("error.message".to_string(), err.to_string()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Err(RoutingError::internal_error( + format!("Failed to determine route: {}", err) + )) + } + } +} + +/// Helper function to record a routing span with the given attributes. +/// Reduces code duplication across different routing outcomes. +async fn record_routing_span( + trace_collector: Arc, + traceparent: &str, + start_time: std::time::Instant, + start_system_time: std::time::SystemTime, + attrs: HashMap, +) { + // The routing always uses OpenAI Chat Completions format internally, + // so we log that as the actual API being used for routing + let routing_api_path = "/v1/chat/completions"; + + let routing_operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path(routing_api_path) + .with_target("Arch-Router-1.5B") + .build(); + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + // Build the routing span directly using constants + let mut span_builder = SpanBuilder::new(&routing_operation_name) + .with_trace_id(&trace_id) + .with_parent_span_id(&parent_span_id) + .with_kind(SpanKind::Client) + .with_start_time(start_system_time) + .with_end_time(std::time::SystemTime::now()) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, routing_api_path.to_string()) + .with_attribute(routing::ROUTE_DETERMINATION_MS, start_time.elapsed().as_millis().to_string()); + + // Add all custom attributes + for (key, value) in attrs { + span_builder = span_builder.with_attribute(key, value); + } + + let span = span_builder.build(); + + // Record the span directly to the collector + trace_collector.record_span(operation_component::ROUTING, span); +} diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index 2d000874..15c02ce6 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -1,18 +1,27 @@ use bytes::Bytes; +use common::traces::{Span, Attribute, AttributeValue, TraceCollector, Event}; use http_body_util::combinators::BoxBody; use http_body_util::StreamBody; use hyper::body::Frame; +use std::sync::Arc; +use std::time::{Instant, SystemTime}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::warn; +// Import tracing constants +use crate::tracing::{llm, error}; + /// Trait for processing streaming chunks /// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging) pub trait StreamProcessor: Send + 'static { /// Process an incoming chunk of bytes fn process_chunk(&mut self, chunk: Bytes) -> Result, String>; + /// Called when the first bytes are received (for time-to-first-token tracking) + fn on_first_bytes(&mut self) {} + /// Called when streaming completes successfully fn on_complete(&mut self) {} @@ -20,13 +29,152 @@ pub trait StreamProcessor: Send + 'static { fn on_error(&mut self, _error: &str) {} } -/// A no-op processor that just forwards chunks as-is -pub struct PassthroughProcessor; +/// A processor that tracks streaming metrics and finalizes the span +pub struct PassthroughProcessor { + collector: Arc, + service_name: String, + span: Span, + total_bytes: usize, + chunk_count: usize, + start_time: Instant, + time_to_first_token: Option, +} + +impl PassthroughProcessor { + /// Create a new passthrough processor + /// + /// # Arguments + /// * `collector` - The trace collector to record the span to + /// * `service_name` - The service name for this span (e.g., "archgw(llm)") + /// * `span` - The span to finalize after streaming completes + /// * `start_time` - When the request started (for duration calculation) + pub fn new( + collector: Arc, + service_name: impl Into, + span: Span, + start_time: Instant, + ) -> Self { + Self { + collector, + service_name: service_name.into(), + span, + total_bytes: 0, + chunk_count: 0, + start_time, + time_to_first_token: None, + } + } +} impl StreamProcessor for PassthroughProcessor { fn process_chunk(&mut self, chunk: Bytes) -> Result, String> { + self.total_bytes += chunk.len(); + self.chunk_count += 1; Ok(Some(chunk)) } + + fn on_first_bytes(&mut self) { + // Record time to first token (only for streaming) + if self.time_to_first_token.is_none() { + self.time_to_first_token = Some(self.start_time.elapsed().as_millis()); + } + } + + fn on_complete(&mut self) { + // Update span with streaming metrics and end time + let end_time_nanos = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + + self.span.end_time_unix_nano = format!("{}", end_time_nanos); + + // Add streaming metrics as attributes using constants + self.span.attributes.push(Attribute { + key: llm::RESPONSE_BYTES.to_string(), + value: AttributeValue { + string_value: Some(self.total_bytes.to_string()), + }, + }); + + + self.span.attributes.push(Attribute { + key: llm::DURATION_MS.to_string(), + value: AttributeValue { + string_value: Some(self.start_time.elapsed().as_millis().to_string()), + }, + }); + + // Add time to first token if available (streaming only) + if let Some(ttft) = self.time_to_first_token { + self.span.attributes.push(Attribute { + key: llm::TIME_TO_FIRST_TOKEN_MS.to_string(), + value: AttributeValue { + string_value: Some(ttft.to_string()), + }, + }); + + // Add time to first token as a span event + // Calculate the timestamp by adding ttft duration to span start time + if let Ok(start_time_nanos) = self.span.start_time_unix_nano.parse::() { + // Convert ttft from milliseconds to nanoseconds and add to start time + let event_timestamp = start_time_nanos + (ttft * 1_000_000); + let mut event = Event::new(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), event_timestamp); + event.add_attribute( + llm::TIME_TO_FIRST_TOKEN_MS.to_string(), + ttft.to_string(), + ); + + // Initialize events vector if needed + if self.span.events.is_none() { + self.span.events = Some(Vec::new()); + } + + if let Some(ref mut events) = self.span.events { + events.push(event); + } + } + } + + // Record the finalized span + self.collector.record_span(&self.service_name, self.span.clone()); + } + + fn on_error(&mut self, error_msg: &str) { + warn!("Stream error in PassthroughProcessor: {}", error_msg); + + // Update span with error info and end time + let end_time_nanos = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + + self.span.end_time_unix_nano = format!("{}", end_time_nanos); + + self.span.attributes.push(Attribute { + key: error::ERROR.to_string(), + value: AttributeValue { + string_value: Some("true".to_string()), + }, + }); + + self.span.attributes.push(Attribute { + key: error::MESSAGE.to_string(), + value: AttributeValue { + string_value: Some(error_msg.to_string()), + }, + }); + + self.span.attributes.push(Attribute { + key: llm::DURATION_MS.to_string(), + value: AttributeValue { + string_value: Some(self.start_time.elapsed().as_millis().to_string()), + }, + }); + + // Record the error span + self.collector.record_span(&self.service_name, self.span.clone()); + } } /// Result of creating a streaming response @@ -48,6 +196,8 @@ where // Spawn a task to process and forward chunks let processor_handle = tokio::spawn(async move { + let mut is_first_chunk = true; + while let Some(item) = byte_stream.next().await { let chunk = match item { Ok(chunk) => chunk, @@ -59,6 +209,12 @@ where } }; + // Call on_first_bytes for the first chunk + if is_first_chunk { + processor.on_first_bytes(); + is_first_chunk = false; + } + // Process the chunk match processor.process_chunk(chunk) { Ok(Some(processed_chunk)) => { @@ -91,3 +247,13 @@ where processor_handle, } } + +/// Truncates a message to the specified maximum length, adding "..." if truncated. +pub fn truncate_message(message: &str, max_length: usize) -> String { + if message.chars().count() > max_length { + let truncated: String = message.chars().take(max_length).collect(); + format!("{}...", truncated) + } else { + message.to_string() + } +} diff --git a/crates/brightstaff/src/lib.rs b/crates/brightstaff/src/lib.rs index a4b5b3ae..ceff49f1 100644 --- a/crates/brightstaff/src/lib.rs +++ b/crates/brightstaff/src/lib.rs @@ -1,3 +1,4 @@ pub mod handlers; pub mod router; +pub mod tracing; pub mod utils; diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 73f5ef58..2b8b30b6 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -1,5 +1,5 @@ use brightstaff::handlers::agent_chat_completions::agent_chat; -use brightstaff::handlers::router::router_chat; +use brightstaff::handlers::router::chat; use brightstaff::handlers::models::list_models; use brightstaff::handlers::function_calling::{function_calling_chat_handler}; use brightstaff::router::llm_router::RouterService; @@ -7,6 +7,7 @@ use brightstaff::utils::tracing::init_tracer; use bytes::Bytes; use common::configuration::Configuration; use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH}; +use common::traces::TraceCollector; use http_body_util::{combinators::BoxBody, BodyExt, Empty}; use hyper::body::Incoming; use hyper::server::conn::http1; @@ -46,10 +47,6 @@ async fn main() -> Result<(), Box> { let _tracer_provider = init_tracer(); let bind_address = env::var("BIND_ADDRESS").unwrap_or_else(|_| BIND_ADDRESS.to_string()); - info!( - "current working directory: {}", - env::current_dir().unwrap().display() - ); // loading arch_config.yaml file let arch_config_path = env::var("ARCH_CONFIG_PATH_RENDERED") .unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string()); @@ -66,19 +63,10 @@ async fn main() -> Result<(), Box> { let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone())); let agents_list = Arc::new(RwLock::new(arch_config.agents.clone())); let listeners = Arc::new(RwLock::new(arch_config.listeners.clone())); - - debug!( - "arch_config: {:?}", - &serde_json::to_string(arch_config.as_ref()).unwrap() - ); - let llm_provider_url = env::var("LLM_PROVIDER_ENDPOINT").unwrap_or_else(|_| "http://localhost:12001".to_string()); - info!("llm provider url: {}", llm_provider_url); - info!("listening on http://{}", bind_address); let listener = TcpListener::bind(bind_address).await?; - let routing_model_name: String = arch_config .routing .as_ref() @@ -100,18 +88,24 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::new(arch_config.model_aliases.clone()); + // Initialize trace collector and start background flusher + let trace_collector = Arc::new(TraceCollector::from_env()); + let _flusher_handle = trace_collector.clone().start_background_flusher(); + info!("Trace collector initialized and background flusher started"); + loop { let (stream, _) = listener.accept().await?; let peer_addr = stream.peer_addr()?; let io = TokioIo::new(stream); let router_service: Arc = Arc::clone(&router_service); - let model_aliases = Arc::clone(&model_aliases); + let model_aliases: Arc>> = Arc::clone(&model_aliases); let llm_provider_url = llm_provider_url.clone(); let llm_providers = llm_providers.clone(); let agents_list = agents_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); let parent_cx = extract_context_from_request(&req); @@ -120,13 +114,14 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); async move { match (req.method(), req.uri().path()) { (&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH) => { let fully_qualified_url = format!("{}{}", llm_provider_url, req.uri().path()); - router_chat(req, router_service, fully_qualified_url, model_aliases) + chat(req, router_service, fully_qualified_url, model_aliases, llm_providers, trace_collector) .with_context(parent_cx) .await } diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs new file mode 100644 index 00000000..8557e5de --- /dev/null +++ b/crates/brightstaff/src/tracing/constants.rs @@ -0,0 +1,319 @@ +/// OpenTelemetry Semantic Conventions +/// +/// This module defines standard attribute keys following OTEL semantic conventions. +/// See: https://opentelemetry.io/docs/specs/semconv/ + +// ============================================================================= +// Span Attributes - HTTP +// ============================================================================= + +/// Semantic conventions for HTTP-related span attributes +pub mod http { + /// HTTP request method + /// Example: "GET", "POST", "PUT" + pub const METHOD: &str = "http.method"; + + /// HTTP response status code + /// Example: "200", "404", "500" + pub const STATUS_CODE: &str = "http.status_code"; + + /// Full HTTP request URL + pub const URL: &str = "http.url"; + + /// HTTP request target (path + query) + /// Example: "/v1/chat/completions?stream=true" + pub const TARGET: &str = "http.target"; + + /// Upstream target path after routing transformation + /// Example: "/api/paas/v4/chat/completions" (for Zhipu provider) + pub const UPSTREAM_TARGET: &str = "http.upstream_target"; + + /// HTTP request scheme + /// Example: "http", "https" + pub const SCHEME: &str = "http.scheme"; + + /// Value of the HTTP User-Agent header + pub const USER_AGENT: &str = "http.user_agent"; + + /// Size of the request payload body in bytes + pub const REQUEST_CONTENT_LENGTH: &str = "http.request_content_length"; + + /// Size of the response payload body in bytes + pub const RESPONSE_CONTENT_LENGTH: &str = "http.response_content_length"; +} + +// ============================================================================= +// Span Attributes - LLM Specific +// ============================================================================= + +/// Custom attributes for LLM operations +/// These follow the emerging OTEL GenAI semantic conventions +pub mod llm { + /// Name of the LLM model being called + /// Example: "gpt-4", "claude-3-sonnet", "llama-2-70b" + pub const MODEL_NAME: &str = "llm.model"; + + /// Provider of the LLM + /// Example: "openai", "anthropic", "azure-openai" + pub const PROVIDER: &str = "llm.provider"; + + /// Type of LLM operation + /// Example: "chat", "completion", "embedding" + pub const OPERATION_TYPE: &str = "llm.operation_type"; + + /// Whether the request is streaming + pub const IS_STREAMING: &str = "llm.is_streaming"; + + /// Total bytes received in the response + pub const RESPONSE_BYTES: &str = "llm.response_bytes"; + + /// Duration of the LLM call in milliseconds + pub const DURATION_MS: &str = "llm.duration_ms"; + + /// Time to first token in milliseconds (streaming only) + pub const TIME_TO_FIRST_TOKEN_MS: &str = "llm.time_to_first_token"; + + /// Number of prompt tokens used + pub const PROMPT_TOKENS: &str = "llm.usage.prompt_tokens"; + + /// Number of completion tokens generated + pub const COMPLETION_TOKENS: &str = "llm.usage.completion_tokens"; + + /// Total tokens used (prompt + completion) + pub const TOTAL_TOKENS: &str = "llm.usage.total_tokens"; + + /// Temperature parameter used + pub const TEMPERATURE: &str = "llm.request.temperature"; + + /// Max tokens parameter used + pub const MAX_TOKENS: &str = "llm.request.max_tokens"; + + /// Top-p parameter used + pub const TOP_P: &str = "llm.request.top_p"; + + /// List of tool names provided in the request + pub const TOOLS: &str = "llm.tools"; + + /// Preview of the user message (truncated) + pub const USER_MESSAGE_PREVIEW: &str = "llm.request.user_message_preview"; +} + +// ============================================================================= +// Span Attributes - Routing & Gateway +// ============================================================================= + +/// Attributes specific to LLM routing and gateway operations +pub mod routing { + /// Strategy used to select the LLM endpoint + /// Example: "round-robin", "least-latency", "cost-optimized" + pub const STRATEGY: &str = "routing.strategy"; + + /// Selected upstream endpoint + pub const UPSTREAM_ENDPOINT: &str = "routing.upstream_endpoint"; + + /// Time taken to determine the route in milliseconds + pub const ROUTE_DETERMINATION_MS: &str = "routing.determination_ms"; + + /// Whether a fallback endpoint was used + pub const IS_FALLBACK: &str = "routing.is_fallback"; + + /// Reason for route selection + pub const SELECTION_REASON: &str = "routing.selection_reason"; +} + +// ============================================================================= +// Span Attributes - Error Handling +// ============================================================================= + +/// Attributes for error and exception tracking +pub mod error { + /// Whether an error occurred + pub const ERROR: &str = "error"; + + /// Type/class of the error + /// Example: "TimeoutError", "AuthenticationError" + pub const TYPE: &str = "error.type"; + + /// Error message + pub const MESSAGE: &str = "error.message"; + + /// Stack trace of the error + pub const STACK_TRACE: &str = "error.stack_trace"; +} + +// ============================================================================= +// Operation Names +// ============================================================================= + +/// Canonical operation name components for Arch Gateway +pub mod operation_component { + /// Inbound request handling + pub const INBOUND: &str = "plano(inbound)"; + + /// Routing decision phase + pub const ROUTING: &str = "plano(routing)"; + + /// Handoff to upstream service + pub const HANDOFF: &str = "plano(handoff)"; + + /// Agent filter execution + pub const AGENT_FILTER: &str = "plano(agent filter)"; + + /// Agent execution + pub const AGENT: &str = "plano(agent)"; + + /// LLM call + pub const LLM: &str = "plano(llm)"; +} + +/// Builder for constructing standardized operation names +/// +/// Format: `{method} {path} {target}` +/// +/// The operation component (e.g., "archgw(llm)") is now part of the service name, +/// so the operation name focuses on the HTTP request details and target. +/// +/// # Examples +/// ``` +/// use brightstaff::tracing::OperationNameBuilder; +/// +/// // LLM call operation: "POST /v1/chat/completions gpt-4" +/// // (service name will be "archgw(llm)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/v1/chat/completions") +/// .with_target("gpt-4") +/// .build(); +/// +/// // Agent filter operation: "POST /agents/v1/chat/completions hallucination-detector" +/// // (service name will be "archgw(agent filter)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/agents/v1/chat/completions") +/// .with_target("hallucination-detector") +/// .build(); +/// +/// // Routing operation: "POST /v1/chat/completions" +/// // (service name will be "archgw(routing)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/v1/chat/completions") +/// .build(); +/// ``` +pub struct OperationNameBuilder { + method: Option, + path: Option, + target: Option, +} + +impl OperationNameBuilder { + /// Create a new operation name builder + pub fn new() -> Self { + Self { + method: None, + path: None, + target: None, + } + } + + /// Set the HTTP method + /// + /// # Arguments + /// * `method` - HTTP method (e.g., "GET", "POST", "PUT") + pub fn with_method(mut self, method: impl Into) -> Self { + self.method = Some(method.into()); + self + } + + /// Set the request path + /// + /// # Arguments + /// * `path` - Request path (e.g., "/v1/chat/completions", "/agents/v1/chat/completions") + pub fn with_path(mut self, path: impl Into) -> Self { + self.path = Some(path.into()); + self + } + + /// Set the target (model name, agent name, or filter name) + /// + /// # Arguments + /// * `target` - Target identifier (e.g., "gpt-4", "my-agent", "hallucination-detector") + pub fn with_target(mut self, target: impl Into) -> Self { + self.target = Some(target.into()); + self + } + + /// Build the operation name string + /// + /// # Format + /// - With all components: `{method} {path} {target}` + /// - Without target: `{method} {path}` + /// - Without path: `{method}` + /// - Empty: returns empty string + pub fn build(self) -> String { + let mut parts = Vec::new(); + + if let Some(method) = self.method { + parts.push(method); + } + + if let Some(path) = self.path { + parts.push(path); + } + + if let Some(target) = self.target { + parts.push(target); + } + + parts.join(" ") + } +} + +impl Default for OperationNameBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_operation_name_full() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/v1/chat/completions") + .with_target("gpt-4") + .build(); + + assert_eq!(op, "POST /v1/chat/completions gpt-4"); + } + + #[test] + fn test_operation_name_no_target() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/v1/chat/completions") + .build(); + + assert_eq!(op, "POST /v1/chat/completions"); + } + + #[test] + fn test_operation_name_agent_filter() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/agents/v1/chat/completions") + .with_target("content-filter") + .build(); + + assert_eq!(op, "POST /agents/v1/chat/completions content-filter"); + } + + #[test] + fn test_operation_name_minimal() { + let op = OperationNameBuilder::new().build(); + assert_eq!(op, ""); + } +} diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs new file mode 100644 index 00000000..bacc9571 --- /dev/null +++ b/crates/brightstaff/src/tracing/mod.rs @@ -0,0 +1,3 @@ +mod constants; + +pub use constants::{OperationNameBuilder, operation_component, http, llm, error, routing}; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index aa95e2e4..62e06da4 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -21,6 +21,15 @@ url = "2.5.4" hermesllm = { version = "0.1.0", path = "../hermesllm" } serde_with = "3.13.0" +# Optional dependencies for trace collection (not available in WASM) +tokio = { version = "1.44", features = ["sync", "time"], optional = true } +reqwest = { version = "0.12", features = ["json"], optional = true } +tracing = { version = "0.1", optional = true } + +[features] +default = [] +trace-collection = ["tokio", "reqwest", "tracing"] + [dev-dependencies] pretty_assertions = "1.4.1" serde_json = "1.0.64" diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 76c368f1..9c8f5787 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -11,4 +11,5 @@ pub mod routing; pub mod stats; pub mod tokenizer; pub mod tracing; +pub mod traces; pub mod utils; diff --git a/crates/common/src/traces/collector.rs b/crates/common/src/traces/collector.rs new file mode 100644 index 00000000..cd199866 --- /dev/null +++ b/crates/common/src/traces/collector.rs @@ -0,0 +1,259 @@ +use super::shapes::Span; +use super::resource_span_builder::ResourceSpanBuilder; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::time::{interval, Duration}; +use tracing::{debug, error, warn}; + +/// Parse W3C traceparent header into trace_id and parent_span_id +/// Format: "00-{trace_id}-{parent_span_id}-01" +/// +/// Returns (trace_id, parent_span_id) as strings +pub fn parse_traceparent(traceparent: &str) -> (String, String) { + let parts: Vec<&str> = traceparent.split('-').collect(); + if parts.len() == 4 { + (parts[1].to_string(), parts[2].to_string()) + } else { + warn!("Invalid traceparent format: {}", traceparent); + // Generate empty IDs if parsing fails + (String::new(), String::new()) + } +} + +/// Collects and batches spans, flushing them to an OTEL collector +/// +/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)") +/// maintaining its own span queue. Flushes all services together periodically. +pub struct TraceCollector { + /// Spans grouped by service name + /// Key: service name (e.g., "archgw(routing)", "archgw(llm)") + /// Value: queue of spans for that service + spans_by_service: Arc>>>, + flush_interval: Duration, + otel_url: String, +} + +impl TraceCollector { + /// Create a new trace collector + /// # Arguments + /// * `flush_interval` - How often to flush buffered spans + /// * `otel_url` - OTEL collector endpoint URL + pub fn new( + flush_interval: Duration, + otel_url: String, + ) -> Self { + Self { + spans_by_service: Arc::new(Mutex::new(HashMap::new())), + flush_interval, + otel_url, + } + } + + /// Create with defaults from environment or sensible defaults + pub fn from_env() -> Self { + let batch_size = std::env::var("TRACE_BATCH_SIZE") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(100); + + let flush_interval_secs = std::env::var("TRACE_FLUSH_INTERVAL_SECS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1); + + let otel_url = std::env::var("OTEL_COLLECTOR_URL") + .unwrap_or_else(|_| "http://host.docker.internal:4318/v1/traces".to_string()); + + debug!( + "TraceCollector initialized: batch_size={}, flush_interval={}s, url={}", + batch_size, flush_interval_secs, otel_url + ); + + Self::new( + Duration::from_secs(flush_interval_secs), + otel_url, + ) + } + + /// Record a span for a specific service + /// + /// # Arguments + /// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)") + /// * `span` - The span to record + pub fn record_span(&self, service_name: impl Into, span: Span) { + let service_name = service_name.into(); + + // Use try_lock to avoid blocking in async contexts + // If the lock is held, we skip recording (telemetry shouldn't block the app) + if let Ok(mut spans_by_service) = self.spans_by_service.try_lock() { + // Get or create the queue for this service + let spans = spans_by_service + .entry(service_name) + .or_insert_with(VecDeque::new); + + spans.push_back(span); + } else { + // Lock contention - skip recording this span + debug!("Skipped span recording due to lock contention"); + } + // Flushing is handled by the periodic background flusher (see `start_background_flusher`). + } + + /// Flush all buffered spans to the OTEL collector + /// Builds ResourceSpans for each service with spans + pub async fn flush(&self) -> Result<(), Box> { + let mut spans_by_service = self.spans_by_service.lock().await; + + if spans_by_service.is_empty() { + return Ok(()); + } + + // Snapshot and drain all services' spans + let service_batches: Vec<(String, Vec)> = spans_by_service + .iter_mut() + .filter_map(|(service_name, spans)| { + if spans.is_empty() { + None + } else { + Some((service_name.clone(), spans.drain(..).collect())) + } + }) + .collect(); + + drop(spans_by_service); // Release lock before HTTP call + + if service_batches.is_empty() { + return Ok(()); + } + + let total_spans: usize = service_batches.iter().map(|(_, spans)| spans.len()).sum(); + debug!("Flushing {} spans across {} services to OTEL collector", total_spans, service_batches.len()); + + // Build canonical OTEL payload structure - one ResourceSpan per service + let resource_spans = self.build_resource_spans(service_batches); + + match self.send_to_otel(resource_spans).await { + Ok(_) => { + debug!("Successfully flushed {} spans", total_spans); + Ok(()) + } + Err(e) => { + warn!("Failed to send spans to OTEL collector: {:?}", e); + Err(e) + } + } + } + + /// Build OTEL-compliant resource spans from collected spans, one ResourceSpan per service + fn build_resource_spans(&self, service_batches: Vec<(String, Vec)>) -> Vec { + service_batches + .into_iter() + .map(|(service_name, spans)| { + ResourceSpanBuilder::new(&service_name) + .add_spans(spans) + .build() + }) + .collect() + } + + /// Send resource spans to OTEL collector + /// Serializes as {"resourceSpans": [...]} per OTEL spec + async fn send_to_otel( + &self, + resource_spans: Vec, + ) -> Result<(), Box> { + let client = reqwest::Client::new(); + + // Create OTEL payload with proper structure + let payload = serde_json::json!({ + "resourceSpans": resource_spans + }); + + let response = client + .post(&self.otel_url) + .header("Content-Type", "application/json") + .json(&payload) + .timeout(Duration::from_secs(5)) + .send() + .await?; + + if !response.status().is_success() { + warn!( + "OTEL collector returned non-success status: {}", + response.status() + ); + return Err(format!("OTEL collector error: {}", response.status()).into()); + } + + Ok(()) + } + + /// Start a background task that periodically flushes traces + /// Returns a join handle that can be used to stop the flusher + pub fn start_background_flusher(self: Arc) -> tokio::task::JoinHandle<()> { + let flush_interval = self.flush_interval; + + tokio::spawn(async move { + let mut ticker = interval(flush_interval); + + loop { + ticker.tick().await; + + if let Err(e) = self.flush().await { + error!("Background trace flush failed: {:?}", e); + } + } + }) + } + + /// Get current number of buffered spans across all services (for testing/monitoring) + pub async fn buffered_count(&self) -> usize { + self.spans_by_service + .lock() + .await + .values() + .map(|spans| spans.len()) + .sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traces::SpanBuilder; + + #[tokio::test] + async fn test_collector_basic() { + let collector = TraceCollector::new( + Duration::from_secs(60), + "http://test:4318/v1/traces".to_string(), + ); + + let span = SpanBuilder::new("test_operation") + .with_trace_id("abc123") + .build(); + + collector.record_span("test-service", span); + + assert_eq!(collector.buffered_count().await, 1); + } + + #[tokio::test] + async fn test_collector_auto_flush() { + // Since batch-triggered flush behavior was removed, record two spans and verify both are buffered + let collector = Arc::new(TraceCollector::new( + Duration::from_secs(60), + "http://test:4318/v1/traces".to_string(), + )); + + let span1 = SpanBuilder::new("test1").build(); + let span2 = SpanBuilder::new("test2").build(); + + collector.record_span("test-service", span1); + collector.record_span("test-service", span2); + + // With no batch-triggered flush, both spans should remain buffered + assert_eq!(collector.buffered_count().await, 2); + } +} diff --git a/crates/common/src/traces/constants.rs b/crates/common/src/traces/constants.rs new file mode 100644 index 00000000..09bdecd5 --- /dev/null +++ b/crates/common/src/traces/constants.rs @@ -0,0 +1,27 @@ +/// OpenTelemetry semantic convention constants for tracing +/// +/// These constants ensure consistency across the codebase and prevent typos + +/// Resource attribute keys following OTEL semantic conventions +pub mod resource { + /// Logical name of the service + pub const SERVICE_NAME: &str = "service.name"; + + /// Version of the service + pub const SERVICE_VERSION: &str = "service.version"; + + /// Service namespace/environment + pub const SERVICE_NAMESPACE: &str = "service.namespace"; + + /// Service instance ID + pub const SERVICE_INSTANCE_ID: &str = "service.instance.id"; +} + +/// Instrumentation scope defaults +pub mod scope { + /// Default scope name for tracing instrumentation + pub const DEFAULT_NAME: &str = "brightstaff.tracing"; + + /// Default scope version + pub const DEFAULT_VERSION: &str = "1.0.0"; +} diff --git a/crates/common/src/traces/mod.rs b/crates/common/src/traces/mod.rs new file mode 100644 index 00000000..a8bc6ca5 --- /dev/null +++ b/crates/common/src/traces/mod.rs @@ -0,0 +1,23 @@ +// Original tracing types (OTEL structures) +mod shapes; +// New tracing utilities +mod span_builder; +mod resource_span_builder; +mod constants; + +#[cfg(feature = "trace-collection")] +mod collector; + +// Re-export original types +pub use shapes::{ + Span, Event, Traceparent, TraceparentNewError, + ResourceSpan, Resource, ScopeSpan, Scope, Attribute, AttributeValue, +}; + +// Re-export new utilities +pub use span_builder::{SpanBuilder, SpanKind}; +pub use resource_span_builder::ResourceSpanBuilder; +pub use constants::*; + +#[cfg(feature = "trace-collection")] +pub use collector::{TraceCollector, parse_traceparent}; diff --git a/crates/common/src/traces/resource_span_builder.rs b/crates/common/src/traces/resource_span_builder.rs new file mode 100644 index 00000000..3e0dd88f --- /dev/null +++ b/crates/common/src/traces/resource_span_builder.rs @@ -0,0 +1,121 @@ +use super::shapes::{ResourceSpan, Resource, ScopeSpan, Scope, Span, Attribute, AttributeValue}; +use super::constants::{resource, scope}; +use std::collections::HashMap; + +/// Builder for creating OTEL ResourceSpan structures +/// +/// Provides a fluent API for building the resource/scope/span hierarchy +pub struct ResourceSpanBuilder { + service_name: String, + resource_attributes: HashMap, + scope_name: String, + scope_version: String, + spans: Vec, +} + +impl ResourceSpanBuilder { + /// Create a new ResourceSpan builder with service name + pub fn new(service_name: impl Into) -> Self { + Self { + service_name: service_name.into(), + resource_attributes: HashMap::new(), + scope_name: scope::DEFAULT_NAME.to_string(), + scope_version: scope::DEFAULT_VERSION.to_string(), + spans: Vec::new(), + } + } + + /// Add a resource attribute (e.g., deployment.environment, host.name) + pub fn with_resource_attribute(mut self, key: impl Into, value: impl Into) -> Self { + self.resource_attributes.insert(key.into(), value.into()); + self + } + + /// Set the instrumentation scope name + pub fn with_scope_name(mut self, name: impl Into) -> Self { + self.scope_name = name.into(); + self + } + + /// Set the instrumentation scope version + pub fn with_scope_version(mut self, version: impl Into) -> Self { + self.scope_version = version.into(); + self + } + + /// Add a single span + pub fn add_span(mut self, span: Span) -> Self { + self.spans.push(span); + self + } + + /// Add multiple spans + pub fn add_spans(mut self, spans: Vec) -> Self { + self.spans.extend(spans); + self + } + + /// Build the ResourceSpan + pub fn build(self) -> ResourceSpan { + // Build resource attributes + let mut attributes = vec![ + Attribute { + key: resource::SERVICE_NAME.to_string(), + value: AttributeValue { + string_value: Some(self.service_name), + }, + } + ]; + + // Add custom resource attributes + for (key, value) in self.resource_attributes { + attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } + + let resource = Resource { attributes }; + + let scope = Scope { + name: self.scope_name, + version: self.scope_version, + attributes: Vec::new(), + }; + + let scope_span = ScopeSpan { + scope, + spans: self.spans, + }; + + ResourceSpan { + resource, + scope_spans: vec![scope_span], + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traces::SpanBuilder; + + #[test] + fn test_resource_span_builder() { + let span1 = SpanBuilder::new("operation1").build(); + let span2 = SpanBuilder::new("operation2").build(); + + let resource_span = ResourceSpanBuilder::new("test-service") + .with_resource_attribute("deployment.environment", "production") + .with_scope_name("test-scope") + .add_span(span1) + .add_span(span2) + .build(); + + assert_eq!(resource_span.resource.attributes.len(), 2); // service.name + custom + assert_eq!(resource_span.scope_spans.len(), 1); + assert_eq!(resource_span.scope_spans[0].spans.len(), 2); + } +} diff --git a/crates/common/src/traces/shapes.rs b/crates/common/src/traces/shapes.rs new file mode 100644 index 00000000..5f521767 --- /dev/null +++ b/crates/common/src/traces/shapes.rs @@ -0,0 +1,123 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ResourceSpan { + pub resource: Resource, + #[serde(rename = "scopeSpans")] + pub scope_spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Resource { + pub attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ScopeSpan { + pub scope: Scope, + pub spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Scope { + pub name: String, + pub version: String, + pub attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Span { + #[serde(rename = "traceId")] + pub trace_id: String, + #[serde(rename = "spanId")] + pub span_id: String, + #[serde(rename = "parentSpanId")] + pub parent_span_id: Option, // Optional in case there's no parent span + pub name: String, + #[serde(rename = "startTimeUnixNano")] + pub start_time_unix_nano: String, + #[serde(rename = "endTimeUnixNano")] + pub end_time_unix_nano: String, + pub kind: u32, + pub attributes: Vec, + pub events: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Event { + #[serde(rename = "timeUnixNano")] + pub time_unix_nano: String, + pub name: String, + pub attributes: Vec, +} + +impl Event { + pub fn new(name: String, time_unix_nano: u128) -> Self { + Event { + time_unix_nano: format!("{}", time_unix_nano), + name, + attributes: Vec::new(), + } + } + + pub fn add_attribute(&mut self, key: String, value: String) { + self.attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Attribute { + pub key: String, + pub value: AttributeValue, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AttributeValue { + #[serde(rename = "stringValue")] + pub string_value: Option, // Use Option to handle different value types +} + +pub struct Traceparent { + pub version: String, + pub trace_id: String, + pub parent_id: String, + pub flags: String, +} + +impl std::fmt::Display for Traceparent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}-{}-{}-{}", + self.version, self.trace_id, self.parent_id, self.flags + ) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum TraceparentNewError { + #[error("Invalid traceparent: \'{0}\'")] + InvalidTraceparent(String), +} + +impl TryFrom for Traceparent { + type Error = TraceparentNewError; + + fn try_from(traceparent: String) -> Result { + let traceparent_tokens: Vec<&str> = traceparent.split("-").collect::>(); + if traceparent_tokens.len() != 4 { + return Err(TraceparentNewError::InvalidTraceparent(traceparent)); + } + Ok(Traceparent { + version: traceparent_tokens[0].to_string(), + trace_id: traceparent_tokens[1].to_string(), + parent_id: traceparent_tokens[2].to_string(), + flags: traceparent_tokens[3].to_string(), + }) + } +} diff --git a/crates/common/src/traces/span_builder.rs b/crates/common/src/traces/span_builder.rs new file mode 100644 index 00000000..187c1678 --- /dev/null +++ b/crates/common/src/traces/span_builder.rs @@ -0,0 +1,193 @@ +use super::shapes::{Span, Attribute, AttributeValue}; +use std::collections::HashMap; +use std::time::SystemTime; + +/// OpenTelemetry span kinds +/// https://opentelemetry.io/docs/specs/otel/trace/api/#spankind +#[derive(Debug, Clone, Copy)] +pub enum SpanKind { + /// Default value. Indicates that the span represents an internal operation within an application + Internal = 0, + /// Indicates that the span describes a request to some remote service + Client = 3, +} + +/// Builder for creating OTEL-compliant spans with a fluent API +/// +/// This is the recommended way to create spans with proper trace context. +/// +/// # Example +/// ```no_run +/// use common::traces::{SpanBuilder, SpanKind}; +/// use std::time::SystemTime; +/// +/// let span = SpanBuilder::new("router_chat") +/// .with_trace_id("abc123") +/// .with_parent_span_id("parent456") +/// .with_kind(SpanKind::Internal) +/// .with_attribute("http.method", "POST") +/// .with_attribute("http.path", "/v1/chat/completions") +/// .build(); +/// ``` +pub struct SpanBuilder { + name: String, + trace_id: Option, + parent_span_id: Option, + start_time: SystemTime, + end_time: Option, + kind: SpanKind, + attributes: HashMap, +} + +impl SpanBuilder { + /// Create a new span builder + /// + /// # Arguments + /// * `name` - The operation name for this span (e.g., "router_chat", "determine_route") + pub fn new(name: impl Into) -> Self { + Self { + name: name.into(), + trace_id: None, + parent_span_id: None, + start_time: SystemTime::now(), + end_time: None, + kind: SpanKind::Internal, + attributes: HashMap::new(), + } + } + + /// Set the trace ID (extracted from traceparent or OpenTelemetry context) + pub fn with_trace_id(mut self, trace_id: impl Into) -> Self { + self.trace_id = Some(trace_id.into()); + self + } + + /// Set the parent span ID to link this span to its parent + pub fn with_parent_span_id(mut self, parent_span_id: impl Into) -> Self { + self.parent_span_id = Some(parent_span_id.into()); + self + } + + /// Set the span kind (defaults to Internal) + pub fn with_kind(mut self, kind: SpanKind) -> Self { + self.kind = kind; + self + } + + /// Set explicit start time (defaults to now) + pub fn with_start_time(mut self, start_time: SystemTime) -> Self { + self.start_time = start_time; + self + } + + /// Set explicit end time (defaults to build time) + pub fn with_end_time(mut self, end_time: SystemTime) -> Self { + self.end_time = Some(end_time); + self + } + + /// Add a single attribute to the span + pub fn with_attribute(mut self, key: impl Into, value: impl Into) -> Self { + self.attributes.insert(key.into(), value.into()); + self + } + + /// Add multiple attributes at once + pub fn with_attributes(mut self, attrs: HashMap) -> Self { + self.attributes.extend(attrs); + self + } + + /// Build the span, consuming the builder + /// + /// Creates a complete OTEL-compliant span with all provided attributes, + /// generating span_id and using provided or random trace_id. + pub fn build(self) -> Span { + let end_time = self.end_time.unwrap_or_else(SystemTime::now); + + let start_nanos = system_time_to_nanos(self.start_time); + let end_nanos = system_time_to_nanos(end_time); + + // Generate trace_id if not provided + let trace_id = self.trace_id.unwrap_or_else(|| generate_random_trace_id()); + + // Create attributes in OTEL format + let attributes: Vec = self.attributes + .into_iter() + .map(|(key, value)| Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }) + .collect(); + + // Build span directly without going through Span::new() + Span { + trace_id, + span_id: generate_random_span_id(), + parent_span_id: self.parent_span_id, + name: self.name, + start_time_unix_nano: format!("{}", start_nanos), + end_time_unix_nano: format!("{}", end_nanos), + kind: self.kind as u32, + attributes, + events: None, + } + } +} + +/// Convert SystemTime to nanoseconds since UNIX epoch for OTEL +fn system_time_to_nanos(time: SystemTime) -> u128 { + time.duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() +} + +/// Generate a random span ID (16 hex characters = 8 bytes) +fn generate_random_span_id() -> String { + use rand::RngCore; + let mut rng = rand::thread_rng(); + let mut random_bytes = [0u8; 8]; + rng.fill_bytes(&mut random_bytes); + hex::encode(random_bytes) +} + +/// Generate a random trace ID (32 hex characters = 16 bytes) +fn generate_random_trace_id() -> String { + use rand::RngCore; + let mut rng = rand::thread_rng(); + let mut random_bytes = [0u8; 16]; + rng.fill_bytes(&mut random_bytes); + hex::encode(random_bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_span_builder_basic() { + let span = SpanBuilder::new("test_operation") + .with_trace_id("abc123") + .with_parent_span_id("parent123") + .with_attribute("key", "value") + .build(); + + assert_eq!(span.name, "test_operation"); + assert_eq!(span.trace_id, "abc123"); + assert_eq!(span.parent_span_id, Some("parent123".to_string())); + assert_eq!(span.attributes.len(), 1); + } + + #[test] + fn test_span_builder_no_parent() { + let span = SpanBuilder::new("root_span") + .with_trace_id("xyz789") + .build(); + + assert_eq!(span.name, "root_span"); + assert_eq!(span.trace_id, "xyz789"); + assert_eq!(span.parent_span_id, None); + } +} diff --git a/crates/hermesllm/src/apis/amazon_bedrock.rs b/crates/hermesllm/src/apis/amazon_bedrock.rs index 252bd0f1..8afd8af0 100644 --- a/crates/hermesllm/src/apis/amazon_bedrock.rs +++ b/crates/hermesllm/src/apis/amazon_bedrock.rs @@ -200,6 +200,17 @@ impl ProviderRequest for ConverseRequest { }) } + fn get_tool_names(&self) -> Option> { + self.tool_config.as_ref()?.tools.as_ref().map(|tools| { + tools + .iter() + .filter_map(|tool| match tool { + Tool::ToolSpec { tool_spec } => Some(tool_spec.name.clone()), + }) + .collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize Bedrock request: {}", e), diff --git a/crates/hermesllm/src/apis/anthropic.rs b/crates/hermesllm/src/apis/anthropic.rs index 7e1951e4..c3c8bc1a 100644 --- a/crates/hermesllm/src/apis/anthropic.rs +++ b/crates/hermesllm/src/apis/anthropic.rs @@ -513,6 +513,12 @@ impl ProviderRequest for MessagesRequest { None } + fn get_tool_names(&self) -> Option> { + self.tools.as_ref().map(|tools| { + tools.iter().map(|tool| tool.name.clone()).collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize MessagesRequest: {}", e), diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index d7f7a07d..0bfd43c2 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -687,6 +687,32 @@ impl ProviderRequest for ChatCompletionsRequest { }) } + fn get_tool_names(&self) -> Option> { + // First check the 'tools' field (current API) + if let Some(tools) = &self.tools { + let names: Vec = tools + .iter() + .map(|tool| tool.function.name.clone()) + .collect(); + if !names.is_empty() { + return Some(names); + } + } + + // Fallback to 'functions' field (deprecated but still supported) + if let Some(functions) = &self.functions { + let names: Vec = functions + .iter() + .map(|func| func.function.name.clone()) + .collect(); + if !names.is_empty() { + return Some(names); + } + } + + None + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(&self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize OpenAI request: {}", e), diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 4f0cf663..fa9976e3 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -1063,6 +1063,19 @@ impl ProviderRequest for ResponsesAPIRequest { } } + fn get_tool_names(&self) -> Option> { + self.tools.as_ref().map(|tools| { + tools + .iter() + .filter_map(|tool| match tool { + Tool::Function { name, .. } => Some(name.clone()), + // Other tool types don't have user-defined names + _ => None, + }) + .collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(&self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize Responses API request: {}", e), diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index daeebe70..c087398f 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -35,6 +35,9 @@ pub trait ProviderRequest: Send + Sync { /// Extract the user message for tracing/logging purposes fn get_recent_user_message(&self) -> Option; + /// Get tool names if tools are defined in the request + fn get_tool_names(&self) -> Option>; + /// Convert the request to bytes for transmission fn to_bytes(&self) -> Result, ProviderRequestError>; @@ -95,6 +98,16 @@ impl ProviderRequest for ProviderRequestType { } } + fn get_tool_names(&self) -> Option> { + match self { + Self::ChatCompletionsRequest(r) => r.get_tool_names(), + Self::MessagesRequest(r) => r.get_tool_names(), + Self::BedrockConverse(r) => r.get_tool_names(), + Self::BedrockConverseStream(r) => r.get_tool_names(), + Self::ResponsesAPIRequest(r) => r.get_tool_names(), + } + } + fn to_bytes(&self) -> Result, ProviderRequestError> { match self { Self::ChatCompletionsRequest(r) => r.to_bytes(), diff --git a/crates/llm_gateway/src/filter_context.rs b/crates/llm_gateway/src/filter_context.rs index 2b8e1a95..3a1f7b84 100644 --- a/crates/llm_gateway/src/filter_context.rs +++ b/crates/llm_gateway/src/filter_context.rs @@ -2,26 +2,18 @@ use crate::metrics::Metrics; use crate::stream_context::StreamContext; use common::configuration::Configuration; use common::configuration::Overrides; -use common::consts::OTEL_COLLECTOR_HTTP; -use common::consts::OTEL_POST_PATH; -use common::http::CallArgs; use common::http::Client; use common::llm_providers::LlmProviders; use common::ratelimit; use common::stats::Gauge; -use common::tracing::TraceData; use log::trace; -use log::warn; use proxy_wasm::traits::*; use proxy_wasm::types::*; use std::cell::RefCell; use std::collections::HashMap; -use std::collections::VecDeque; use std::rc::Rc; use std::time::Duration; -use std::sync::{Arc, Mutex}; - #[derive(Debug)] pub struct CallContext {} @@ -31,7 +23,6 @@ pub struct FilterContext { // callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request. callouts: RefCell>, llm_providers: Option>, - traces_queue: Arc>>, overrides: Rc>, } @@ -41,7 +32,6 @@ impl FilterContext { callouts: RefCell::new(HashMap::new()), metrics: Rc::new(Metrics::new()), llm_providers: None, - traces_queue: Arc::new(Mutex::new(VecDeque::new())), overrides: Rc::new(None), } } @@ -95,7 +85,6 @@ impl RootContext for FilterContext { .as_ref() .expect("LLM Providers must exist when Streams are being created"), ), - Arc::clone(&self.traces_queue), Rc::clone(&self.overrides), ))) } @@ -108,34 +97,6 @@ impl RootContext for FilterContext { self.set_tick_period(Duration::from_secs(1)); true } - - fn on_tick(&mut self) { - let _ = self.traces_queue.try_lock().map(|mut traces_queue| { - while let Some(trace) = traces_queue.pop_front() { - let trace_str = serde_json::to_string(&trace).unwrap(); - trace!("trace details: {}", trace_str); - let call_args = CallArgs::new( - OTEL_COLLECTOR_HTTP, - OTEL_POST_PATH, - vec![ - (":method", http::Method::POST.as_str()), - (":path", OTEL_POST_PATH), - (":authority", OTEL_COLLECTOR_HTTP), - ("content-type", "application/json"), - ], - Some(trace_str.as_bytes()), - vec![], - Duration::from_secs(60), - ); - if let Err(error) = self.http_call(call_args, CallContext {}) { - warn!( - "failed to schedule http call to otel-collector: {:?}", - error - ); - } - } - }); - } } impl Context for FilterContext { diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 42d7cb31..fbbf6c28 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -4,10 +4,8 @@ use log::{debug, info, warn}; use proxy_wasm::hostcalls::get_current_time; use proxy_wasm::traits::*; use proxy_wasm::types::*; -use std::collections::VecDeque; use std::num::NonZero; use std::rc::Rc; -use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::metrics::Metrics; @@ -20,7 +18,6 @@ use common::errors::ServerError; use common::llm_providers::LlmProviders; use common::ratelimit::Header; use common::stats::{IncrementingMetric, RecordingMetric}; -use common::tracing::{Event, Span, TraceData, Traceparent}; use common::{ratelimit, routing, tokenizer}; use hermesllm::apis::streaming_shapes::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; use hermesllm::apis::streaming_shapes::sse::{ @@ -51,7 +48,6 @@ pub struct StreamContext { ttft_time: Option, traceparent: Option, request_body_sent_time: Option, - traces_queue: Arc>>, overrides: Rc>, user_message: Option, upstream_status_code: Option, @@ -65,7 +61,6 @@ impl StreamContext { pub fn new( metrics: Rc, llm_providers: Rc, - traces_queue: Arc>>, overrides: Rc>, ) -> Self { StreamContext { @@ -83,7 +78,6 @@ impl StreamContext { ttft_duration: None, traceparent: None, ttft_time: None, - traces_queue, request_body_sent_time: None, user_message: None, upstream_status_code: None, @@ -333,68 +327,6 @@ impl StreamContext { self.metrics .output_sequence_length .record(self.response_tokens as u64); - - if let Some(traceparent) = self.traceparent.as_ref() { - let current_time_ns = current_time_ns(); - - match Traceparent::try_from(traceparent.to_string()) { - Err(e) => { - warn!("traceparent header is invalid: {}", e); - } - Ok(traceparent) => { - let service_name = match &self.resolved_api { - Some(api) => { - let api_display = api.to_string(); - format!("archgw.{}", api_display) - } - None => "archgw".to_string(), - }; - - let mut trace_data = - common::tracing::TraceData::new_with_service_name(service_name); - let mut llm_span = Span::new( - self.llm_provider().name.to_string(), - Some(traceparent.trace_id), - Some(traceparent.parent_id), - self.request_body_sent_time.unwrap(), - current_time_ns, - ); - llm_span - .add_attribute("model".to_string(), self.llm_provider().name.to_string()); - - if let Some(user_message) = &self.user_message { - llm_span.add_attribute("message".to_string(), user_message.clone()); - } - - // Add HTTP attributes - if let Some(method) = &self.http_method { - llm_span.add_attribute("http.method".to_string(), method.clone()); - } - if let Some(protocol) = &self.http_protocol { - llm_span.add_attribute("http.protocol".to_string(), protocol.clone()); - } - if let Some(status_code) = &self.upstream_status_code { - llm_span.add_attribute( - "http.status_code".to_string(), - status_code.as_u16().to_string(), - ); - } - - // Add request ID attribute - llm_span - .add_attribute("http.request_id".to_string(), self.request_identifier()); - - if self.ttft_time.is_some() { - llm_span.add_event(Event::new( - "time_to_first_token".to_string(), - self.ttft_time.unwrap(), - )); - } - trace_data.add_span(llm_span); - self.traces_queue.lock().unwrap().push_back(trace_data); - } - }; - } } fn read_raw_response_body(&mut self, body_size: usize) -> Result, Action> { diff --git a/demos/samples_python/currency_exchange/arch_config.yaml b/demos/samples_python/currency_exchange/arch_config.yaml index 1c399449..61a85b84 100644 --- a/demos/samples_python/currency_exchange/arch_config.yaml +++ b/demos/samples_python/currency_exchange/arch_config.yaml @@ -8,8 +8,21 @@ listeners: timeout: 30s llm_providers: - - access_key: $OPENAI_API_KEY - model: openai/gpt-4o + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + + - model: openai/gpt-4o + access_key: $OPENAI_API_KEY + routing_preferences: + - name: code understanding + description: understand and explain existing code snippets, functions, or libraries + + - model: anthropic/claude-sonnet-4-20250514 + access_key: $ANTHROPIC_API_KEY + routing_preferences: + - name: code generation + description: generating new code snippets, functions, or boilerplate based on user prompts or requirements endpoints: frankfurther_api: