diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index e9b60756..80388b7a 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -469,6 +469,48 @@ static_resources: typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + - name: otel_collector_proxy + address: + socket_address: + address: 0.0.0.0 + port_value: 9903 + traffic_direction: OUTBOUND + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: otel_proxy + codec_type: AUTO + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "/var/log/access_otel.log" + format: | + [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" + route_config: + name: otel_route + virtual_hosts: + - name: otel_backend + domains: ["*"] + routes: + - match: + prefix: "/v1/traces" + route: + cluster: otel_collector_http_proxy + timeout: 5s + retry_policy: + retry_on: "5xx,connect-failure,refused-stream,reset" + num_retries: 3 + per_try_timeout: 2s + host_selection_retry_max_attempts: 5 + retriable_status_codes: [500, 502, 503, 504] + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + - name: egress_traffic_llm address: socket_address: @@ -1033,4 +1075,34 @@ static_resources: socket_address: address: host.docker.internal port_value: 4318 + # Cluster for OTEL HTTP proxy with retry/circuit breaking + - name: otel_collector_http_proxy + connect_timeout: 2s + type: STRICT_DNS + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: otel_collector_http_proxy + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: host.docker.internal + port_value: 4318 + # Circuit breaker configuration to prevent overwhelming OTEL collector + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 100 + max_pending_requests: 100 + max_requests: 100 + max_retries: 3 + # Health checking and outlier detection + outlier_detection: + consecutive_5xx: 5 + interval: 10s + base_ejection_time: 30s + max_ejection_percent: 50 + enforcing_consecutive_5xx: 100 {% endif %} diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 2b8b30b6..4f76f5df 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -89,9 +89,18 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::new(arch_config.model_aliases.clone()); // Initialize trace collector and start background flusher - let trace_collector = Arc::new(TraceCollector::from_env()); + // Tracing is enabled if the tracing config is present in arch_config.yaml + // Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED + let tracing_enabled = if arch_config.tracing.is_some() { + info!("Tracing configuration found in arch_config.yaml"); + Some(true) + } else { + info!("No tracing configuration in arch_config.yaml, will check OTEL_TRACING_ENABLED env var"); + None + }; + let trace_collector = Arc::new(TraceCollector::new(tracing_enabled)); let _flusher_handle = trace_collector.clone().start_background_flusher(); - info!("Trace collector initialized and background flusher started"); + loop { let (stream, _) = listener.accept().await?; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 62e06da4..f0def5d9 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -33,3 +33,4 @@ trace-collection = ["tokio", "reqwest", "tracing"] [dev-dependencies] pretty_assertions = "1.4.1" serde_json = "1.0.64" +tokio = { version = "1.44", features = ["sync", "time", "macros", "rt"] } diff --git a/crates/common/src/traces/collector.rs b/crates/common/src/traces/collector.rs index cd199866..072075a0 100644 --- a/crates/common/src/traces/collector.rs +++ b/crates/common/src/traces/collector.rs @@ -25,6 +25,12 @@ pub fn parse_traceparent(traceparent: &str) -> (String, String) { /// /// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)") /// maintaining its own span queue. Flushes all services together periodically. +/// +/// Tracing can be enabled/disabled in two ways: +/// 1. Via arch_config.yaml: presence of `tracing` configuration section +/// 2. Via environment variable: `OTEL_TRACING_ENABLED=true/false` +/// +/// When disabled, span recording and flushing are no-ops. pub struct TraceCollector { /// Spans grouped by service name /// Key: service name (e.g., "archgw(routing)", "archgw(llm)") @@ -32,48 +38,53 @@ pub struct TraceCollector { spans_by_service: Arc>>>, flush_interval: Duration, otel_url: String, + /// Whether tracing is enabled + enabled: bool, } impl TraceCollector { /// Create a new trace collector + /// /// # Arguments - /// * `flush_interval` - How often to flush buffered spans - /// * `otel_url` - OTEL collector endpoint URL - pub fn new( - flush_interval: Duration, - otel_url: String, - ) -> Self { - Self { - spans_by_service: Arc::new(Mutex::new(HashMap::new())), - flush_interval, - otel_url, - } - } - - /// Create with defaults from environment or sensible defaults - pub fn from_env() -> Self { - let batch_size = std::env::var("TRACE_BATCH_SIZE") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(100); - + /// * `enabled` - Whether tracing is enabled + /// - `Some(true)` - Force enable tracing + /// - `Some(false)` - Force disable tracing + /// - `None` - Check `OTEL_TRACING_ENABLED` env var (defaults to true if not set) + /// + /// Other parameters are read from environment variables: + /// - `TRACE_FLUSH_INTERVAL_SECS` - Flush interval in seconds (default: 1) + /// - `OTEL_COLLECTOR_URL` - OTEL collector endpoint (default: http://localhost:9903/v1/traces) + pub fn new(enabled: Option) -> Self { let flush_interval_secs = std::env::var("TRACE_FLUSH_INTERVAL_SECS") .ok() .and_then(|s| s.parse().ok()) .unwrap_or(1); let otel_url = std::env::var("OTEL_COLLECTOR_URL") - .unwrap_or_else(|_| "http://host.docker.internal:4318/v1/traces".to_string()); + .unwrap_or_else(|_| "http://localhost:9903/v1/traces".to_string()); + + // Determine if tracing is enabled: + // 1. Use explicit parameter if provided + // 2. Otherwise check OTEL_TRACING_ENABLED env var + // 3. Default to true if neither is set + let enabled = enabled.unwrap_or_else(|| { + std::env::var("OTEL_TRACING_ENABLED") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(true) + }); debug!( - "TraceCollector initialized: batch_size={}, flush_interval={}s, url={}", - batch_size, flush_interval_secs, otel_url + "TraceCollector initialized: flush_interval={}s, url={}, enabled={}", + flush_interval_secs, otel_url, enabled ); - Self::new( - Duration::from_secs(flush_interval_secs), + Self { + spans_by_service: Arc::new(Mutex::new(HashMap::new())), + flush_interval: Duration::from_secs(flush_interval_secs), otel_url, - ) + enabled, + } } /// Record a span for a specific service @@ -82,6 +93,11 @@ impl TraceCollector { /// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)") /// * `span` - The span to record pub fn record_span(&self, service_name: impl Into, span: Span) { + // Skip recording if tracing is disabled + if !self.enabled { + return; + } + let service_name = service_name.into(); // Use try_lock to avoid blocking in async contexts @@ -103,6 +119,11 @@ impl TraceCollector { /// Flush all buffered spans to the OTEL collector /// Builds ResourceSpans for each service with spans pub async fn flush(&self) -> Result<(), Box> { + // Skip flushing if tracing is disabled + if !self.enabled { + return Ok(()); + } + let mut spans_by_service = self.spans_by_service.lock().await; if spans_by_service.is_empty() { @@ -225,10 +246,7 @@ mod tests { #[tokio::test] async fn test_collector_basic() { - let collector = TraceCollector::new( - Duration::from_secs(60), - "http://test:4318/v1/traces".to_string(), - ); + let collector = TraceCollector::new(Some(true)); let span = SpanBuilder::new("test_operation") .with_trace_id("abc123") @@ -242,10 +260,7 @@ mod tests { #[tokio::test] async fn test_collector_auto_flush() { // Since batch-triggered flush behavior was removed, record two spans and verify both are buffered - let collector = Arc::new(TraceCollector::new( - Duration::from_secs(60), - "http://test:4318/v1/traces".to_string(), - )); + let collector = Arc::new(TraceCollector::new(Some(true))); let span1 = SpanBuilder::new("test1").build(); let span2 = SpanBuilder::new("test2").build();