From c999b36c41d8eb6f4c89598414c4452d58f4a6d7 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Sat, 7 Feb 2026 13:08:32 -0800 Subject: [PATCH] remove unused trace collector and fix env var --- .github/workflows/rust_tests.yml | 3 - archgw.code-workspace | 3 +- cli/planoai/main.py | 2 +- config/docker-compose.dev.yaml | 2 +- config/envoy.template.yaml | 44 --- crates/.vscode/launch.json | 2 +- crates/Cargo.lock | 2 - crates/Cargo.toml | 4 - crates/brightstaff/Cargo.toml | 2 +- .../src/handlers/agent_chat_completions.rs | 2 +- .../src/handlers/pipeline_processor.rs | 2 +- crates/brightstaff/src/utils/tracing.rs | 2 +- crates/common/Cargo.toml | 6 - crates/common/src/consts.rs | 1 - crates/common/src/traces/collector.rs | 292 --------------- crates/common/src/traces/mod.rs | 9 - .../src/traces/tests/mock_otel_collector.rs | 96 ----- crates/common/src/traces/tests/mod.rs | 4 - .../traces/tests/trace_integration_test.rs | 342 ------------------ .../use_cases/llm_routing/docker-compose.yaml | 2 +- .../docker-compose.yaml | 2 +- .../docker-compose.yaml | 38 +- tests/e2e/docker-compose.yaml | 2 +- tests/rest/tracing.rest | 31 -- 24 files changed, 31 insertions(+), 864 deletions(-) delete mode 100644 crates/common/src/traces/collector.rs delete mode 100644 crates/common/src/traces/tests/mock_otel_collector.rs delete mode 100644 crates/common/src/traces/tests/mod.rs delete mode 100644 crates/common/src/traces/tests/trace_integration_test.rs delete mode 100644 tests/rest/tracing.rest diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index e871745c..d72e7af4 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -29,6 +29,3 @@ jobs: - name: Run unit tests run: cargo test --lib - - - name: Run trace integration tests - run: cargo test -p common --features trace-collection traces::tests::trace_integration_test diff --git a/archgw.code-workspace b/archgw.code-workspace index bd42f64a..366191ef 100644 --- a/archgw.code-workspace +++ b/archgw.code-workspace @@ -25,8 +25,7 @@ "[python]": { "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true - }, - "rust-analyzer.cargo.features": ["trace-collection"] + } }, "extensions": { "recommendations": [ diff --git a/cli/planoai/main.py b/cli/planoai/main.py index 9157373c..d0ef74c3 100644 --- a/cli/planoai/main.py +++ b/cli/planoai/main.py @@ -153,7 +153,7 @@ def up(file, path, foreground): # Set the ARCH_CONFIG_FILE environment variable env_stage = { - "OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces", + "OTEL_TRACING_GRPC_ENDPOINT": "http://host.docker.internal:4317", } env = os.environ.copy() # Remove PATH variable if present diff --git a/config/docker-compose.dev.yaml b/config/docker-compose.dev.yaml index c683cba3..2e061939 100644 --- a/config/docker-compose.dev.yaml +++ b/config/docker-compose.dev.yaml @@ -21,4 +21,4 @@ services: environment: - OPENAI_API_KEY=${OPENAI_API_KEY:?error} - MISTRAL_API_KEY=${MISTRAL_API_KEY:?error} - - OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces + - OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317 diff --git a/config/envoy.template.yaml b/config/envoy.template.yaml index f29c5964..721a3a7c 100644 --- a/config/envoy.template.yaml +++ b/config/envoy.template.yaml @@ -470,50 +470,6 @@ static_resources: typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router -{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - - name: otel_collector_proxy - address: - socket_address: - address: 127.0.0.1 - port_value: 9903 - traffic_direction: OUTBOUND - filter_chains: - - filters: - - name: envoy.filters.network.http_connection_manager - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - stat_prefix: otel_proxy - codec_type: AUTO - # access_log: - # - name: envoy.access_loggers.file - # typed_config: - # "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - # path: "/var/log/access_otel.log" - # format: | - # [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" - route_config: - name: otel_route - virtual_hosts: - - name: otel_backend - domains: ["*"] - routes: - - match: - prefix: "/v1/traces" - route: - cluster: opentelemetry_collector_http - timeout: 5s - retry_policy: - retry_on: "5xx,connect-failure,refused-stream,reset" - num_retries: 3 - per_try_timeout: 2s - host_selection_retry_max_attempts: 5 - retriable_status_codes: [500, 502, 503, 504] - http_filters: - - name: envoy.filters.http.router - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router -{% endif %} - - name: egress_traffic_llm address: socket_address: diff --git a/crates/.vscode/launch.json b/crates/.vscode/launch.json index 68b1da3b..1dc48199 100644 --- a/crates/.vscode/launch.json +++ b/crates/.vscode/launch.json @@ -15,7 +15,7 @@ "RUST_BACKTRACE": "1", "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/multi_agent_with_crewai_langchain/config.yaml_rendered", // "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/preference_based_routing/config.yaml_rendered", - "OTEL_COLLECTOR_URL": "http://localhost:4317", + "OTEL_TRACING_GRPC_ENDPOINT": "http://localhost:4317", "OTEL_TRACING_ENABLED": "true" }, "preLaunchTask": "rust: cargo build" diff --git a/crates/Cargo.lock b/crates/Cargo.lock index 5b8c9a22..f2744ad2 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -445,7 +445,6 @@ dependencies = [ "pretty_assertions", "proxy-wasm", "rand 0.8.5", - "reqwest", "serde", "serde_json", "serde_with", @@ -454,7 +453,6 @@ dependencies = [ "thiserror 1.0.69", "tiktoken-rs", "tokio", - "tracing", "url", "urlencoding", ] diff --git a/crates/Cargo.toml b/crates/Cargo.toml index c22e252e..5cd6b29c 100644 --- a/crates/Cargo.toml +++ b/crates/Cargo.toml @@ -1,7 +1,3 @@ [workspace] resolver = "2" members = ["llm_gateway", "prompt_gateway", "common", "brightstaff", "hermesllm"] - -[workspace.metadata.rust-analyzer] -# Enable features for better IDE support -cargo.features = ["trace-collection"] diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 2b1f7d36..5d986ffa 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -8,7 +8,7 @@ async-openai = "0.30.1" async-trait = "0.1" bytes = "1.10.1" chrono = "0.4" -common = { version = "0.1.0", path = "../common", features = ["trace-collection"] } +common = { version = "0.1.0", path = "../common" } eventsource-client = "0.15.0" eventsource-stream = "0.2.3" flate2 = "1.0" diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index b9579fad..99a1f2e9 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -177,7 +177,7 @@ async fn handle_agent_chat_inner( }; get_active_span(|span| { - span.update_name(format!("(orchestrator) {}", listener.name)); + span.update_name(listener.name.to_string()); }); info!(listener = %listener.name, "handling request"); diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 9122d6ba..a83bf4cb 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -643,7 +643,7 @@ impl PipelineProcessor { let request_url = "/v1/chat/completions"; get_active_span(|span| { - span.update_name(format!("(agent) {} {}", terminal_agent.id, request_url)); + span.update_name(format!("{} {}", terminal_agent.id, request_url)); }); let request_body = ProviderRequestType::to_bytes(&original_request).unwrap(); diff --git a/crates/brightstaff/src/utils/tracing.rs b/crates/brightstaff/src/utils/tracing.rs index 63060686..eda78c43 100644 --- a/crates/brightstaff/src/utils/tracing.rs +++ b/crates/brightstaff/src/utils/tracing.rs @@ -85,7 +85,7 @@ pub fn init_tracer() -> &'static SdkTracerProvider { global::set_text_map_propagator(TraceContextPropagator::new()); // Get OTEL collector URL from environment - let otel_endpoint = std::env::var("OTEL_COLLECTOR_URL") + let otel_endpoint = std::env::var("OTEL_TRACING_GRPC_ENDPOINT") .unwrap_or_else(|_| "http://localhost:4317".to_string()); let tracing_enabled = std::env::var("OTEL_TRACING_ENABLED") diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index bcff9b32..cb471bd6 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -21,14 +21,8 @@ url = "2.5.4" hermesllm = { version = "0.1.0", path = "../hermesllm" } serde_with = "3.13.0" -# Optional dependencies for trace collection (not available in WASM) -tokio = { version = "1.44", features = ["sync", "time"], optional = true } -reqwest = { version = "0.12", features = ["json"], optional = true } -tracing = { version = "0.1", optional = true } - [features] default = [] -trace-collection = ["tokio", "reqwest", "tracing"] [dev-dependencies] pretty_assertions = "1.4.1" diff --git a/crates/common/src/consts.rs b/crates/common/src/consts.rs index 23260747..cafc8e80 100644 --- a/crates/common/src/consts.rs +++ b/crates/common/src/consts.rs @@ -30,7 +30,6 @@ pub const ARCH_MODEL_PREFIX: &str = "Arch"; pub const HALLUCINATION_TEMPLATE: &str = "It seems I'm missing some information. Could you provide the following details "; pub const OTEL_COLLECTOR_HTTP: &str = "opentelemetry_collector_http"; -pub const OTEL_POST_PATH: &str = "/v1/traces"; pub const LLM_ROUTE_HEADER: &str = "x-arch-llm-route"; pub const ENVOY_RETRY_HEADER: &str = "x-envoy-max-retries"; pub const BRIGHT_STAFF_SERVICE_NAME: &str = "brightstaff"; diff --git a/crates/common/src/traces/collector.rs b/crates/common/src/traces/collector.rs deleted file mode 100644 index e26f544e..00000000 --- a/crates/common/src/traces/collector.rs +++ /dev/null @@ -1,292 +0,0 @@ -use super::resource_span_builder::ResourceSpanBuilder; -use super::shapes::Span; -use std::collections::{HashMap, VecDeque}; -use std::sync::Arc; -use tokio::sync::Mutex; -use tokio::time::{interval, Duration}; -use tracing::{debug, error, warn}; - -/// Parse W3C traceparent header into trace_id and parent_span_id -/// Format: "00-{trace_id}-{parent_span_id}-01" -/// -/// Returns (trace_id, Option) -/// - parent_span_id is None if it's all zeros (0000000000000000), indicating a root span -pub fn parse_traceparent(traceparent: &str) -> (String, Option) { - let parts: Vec<&str> = traceparent.split('-').collect(); - if parts.len() == 4 { - let trace_id = parts[1].to_string(); - let parent_span_id = parts[2].to_string(); - - // If parent_span_id is all zeros, this is a root span with no parent - let parent = if parent_span_id == "0000000000000000" { - None - } else { - Some(parent_span_id) - }; - - (trace_id, parent) - } else { - warn!("Invalid traceparent format: {}", traceparent); - // Return empty trace ID and None for parent if parsing fails - (String::new(), None) - } -} - -/// Collects and batches spans, flushing them to an OTEL collector -/// -/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)") -/// maintaining its own span queue. Flushes all services together periodically. -/// -/// Tracing can be enabled/disabled in two ways: -/// 1. Via arch_config.yaml: presence of `tracing` configuration section -/// 2. Via environment variable: `OTEL_TRACING_ENABLED=true/false` -/// -/// When disabled, span recording and flushing are no-ops. -pub struct TraceCollector { - /// Spans grouped by service name - /// Key: service name (e.g., "archgw(routing)", "archgw(llm)") - /// Value: queue of spans for that service - spans_by_service: Arc>>>, - flush_interval: Duration, - otel_url: String, - /// Whether tracing is enabled - enabled: bool, -} - -impl TraceCollector { - /// Create a new trace collector - /// - /// # Arguments - /// * `enabled` - Whether tracing is enabled - /// - `Some(true)` - Force enable tracing - /// - `Some(false)` - Force disable tracing - /// - `None` - Check `OTEL_TRACING_ENABLED` env var (defaults to true if not set) - /// - /// Other parameters are read from environment variables: - /// - `TRACE_FLUSH_INTERVAL_MS` - Flush interval in milliseconds (default: 1000) - /// - `OTEL_COLLECTOR_URL` - OTEL collector endpoint (default: http://localhost:9903/v1/traces) - pub fn new(enabled: Option) -> Self { - let flush_interval_ms = std::env::var("TRACE_FLUSH_INTERVAL_MS") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(1000); - - let otel_url = std::env::var("OTEL_COLLECTOR_URL") - .unwrap_or_else(|_| "http://localhost:9903/v1/traces".to_string()); - - // Determine if tracing is enabled: - // 1. Use explicit parameter if provided - // 2. Otherwise check OTEL_TRACING_ENABLED env var - // 3. Default to false if neither is set (tracing opt-in, not opt-out) - let enabled = enabled.unwrap_or_else(|| { - std::env::var("OTEL_TRACING_ENABLED") - .ok() - .and_then(|s| s.parse().ok()) - .unwrap_or(false) - }); - - debug!( - "TraceCollector initialized: flush_interval={}ms, url={}, enabled={}", - flush_interval_ms, otel_url, enabled - ); - - Self { - spans_by_service: Arc::new(Mutex::new(HashMap::new())), - flush_interval: Duration::from_millis(flush_interval_ms), - otel_url, - enabled, - } - } - - /// Record a span for a specific service - /// - /// # Arguments - /// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)") - /// * `span` - The span to record - pub fn record_span(&self, service_name: impl Into, span: Span) { - // Skip recording if tracing is disabled - if !self.enabled { - return; - } - - let service_name = service_name.into(); - - // Use try_lock to avoid blocking in async contexts - // If the lock is held, we skip recording (telemetry shouldn't block the app) - if let Ok(mut spans_by_service) = self.spans_by_service.try_lock() { - // Get or create the queue for this service - let spans = spans_by_service - .entry(service_name) - .or_insert_with(VecDeque::new); - - spans.push_back(span); - } else { - // Lock contention - skip recording this span - debug!("Skipped span recording due to lock contention"); - } - // Flushing is handled by the periodic background flusher (see `start_background_flusher`). - } - - /// Flush all buffered spans to the OTEL collector - /// Builds ResourceSpans for each service with spans - pub async fn flush(&self) -> Result<(), Box> { - // Skip flushing if tracing is disabled - if !self.enabled { - return Ok(()); - } - - let mut spans_by_service = self.spans_by_service.lock().await; - - if spans_by_service.is_empty() { - return Ok(()); - } - - // Snapshot and drain all services' spans - let service_batches: Vec<(String, Vec)> = spans_by_service - .iter_mut() - .filter_map(|(service_name, spans)| { - if spans.is_empty() { - None - } else { - Some((service_name.clone(), spans.drain(..).collect())) - } - }) - .collect(); - - drop(spans_by_service); // Release lock before HTTP call - - if service_batches.is_empty() { - return Ok(()); - } - - let total_spans: usize = service_batches.iter().map(|(_, spans)| spans.len()).sum(); - debug!( - "Flushing {} spans across {} services to OTEL collector", - total_spans, - service_batches.len() - ); - - // Build canonical OTEL payload structure - one ResourceSpan per service - let resource_spans = self.build_resource_spans(service_batches); - - match self.send_to_otel(resource_spans).await { - Ok(_) => { - debug!("Successfully flushed {} spans", total_spans); - Ok(()) - } - Err(e) => { - warn!("Failed to send spans to OTEL collector: {:?}", e); - Err(e) - } - } - } - - /// Build OTEL-compliant resource spans from collected spans, one ResourceSpan per service - fn build_resource_spans( - &self, - service_batches: Vec<(String, Vec)>, - ) -> Vec { - service_batches - .into_iter() - .map(|(service_name, spans)| { - ResourceSpanBuilder::new(&service_name) - .add_spans(spans) - .build() - }) - .collect() - } - - /// Send resource spans to OTEL collector - /// Serializes as {"resourceSpans": [...]} per OTEL spec - async fn send_to_otel( - &self, - resource_spans: Vec, - ) -> Result<(), Box> { - let client = reqwest::Client::new(); - - // Create OTEL payload with proper structure - let payload = serde_json::json!({ - "resourceSpans": resource_spans - }); - - let response = client - .post(&self.otel_url) - .header("Content-Type", "application/json") - .json(&payload) - .timeout(Duration::from_secs(5)) - .send() - .await?; - - if !response.status().is_success() { - warn!( - "OTEL collector returned non-success status: {}", - response.status() - ); - return Err(format!("OTEL collector error: {}", response.status()).into()); - } - - Ok(()) - } - - /// Start a background task that periodically flushes traces - /// Returns a join handle that can be used to stop the flusher - pub fn start_background_flusher(self: Arc) -> tokio::task::JoinHandle<()> { - let flush_interval = self.flush_interval; - - tokio::spawn(async move { - let mut ticker = interval(flush_interval); - - loop { - ticker.tick().await; - - if let Err(e) = self.flush().await { - error!("Background trace flush failed: {:?}", e); - } - } - }) - } - - /// Get current number of buffered spans across all services (for testing/monitoring) - pub async fn buffered_count(&self) -> usize { - self.spans_by_service - .lock() - .await - .values() - .map(|spans| spans.len()) - .sum() - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::traces::SpanBuilder; - - #[tokio::test] - async fn test_collector_basic() { - let collector = TraceCollector::new(Some(true)); - - let span = SpanBuilder::new("test_operation") - .with_trace_id("abc123") - .build(); - - collector.record_span("test-service", span); - - assert_eq!(collector.buffered_count().await, 1); - } - - #[tokio::test] - async fn test_collector_auto_flush() { - // Since batch-triggered flush behavior was removed, record two spans and verify both are buffered - let collector = Arc::new(TraceCollector::new(Some(true))); - - let span1 = SpanBuilder::new("test1").build(); - let span2 = SpanBuilder::new("test2").build(); - - collector.record_span("test-service", span1); - collector.record_span("test-service", span2); - - // With no batch-triggered flush, both spans should remain buffered - assert_eq!(collector.buffered_count().await, 2); - } -} diff --git a/crates/common/src/traces/mod.rs b/crates/common/src/traces/mod.rs index 6181f194..e9177073 100644 --- a/crates/common/src/traces/mod.rs +++ b/crates/common/src/traces/mod.rs @@ -5,12 +5,6 @@ mod constants; mod resource_span_builder; mod span_builder; -#[cfg(feature = "trace-collection")] -mod collector; - -#[cfg(all(test, feature = "trace-collection"))] -mod tests; - // Re-export original types pub use shapes::{ Attribute, AttributeValue, Event, Resource, ResourceSpan, Scope, ScopeSpan, Span, Traceparent, @@ -21,6 +15,3 @@ pub use shapes::{ pub use constants::*; pub use resource_span_builder::ResourceSpanBuilder; pub use span_builder::{generate_random_span_id, SpanBuilder, SpanKind}; - -#[cfg(feature = "trace-collection")] -pub use collector::{parse_traceparent, TraceCollector}; diff --git a/crates/common/src/traces/tests/mock_otel_collector.rs b/crates/common/src/traces/tests/mock_otel_collector.rs deleted file mode 100644 index 8c8e770d..00000000 --- a/crates/common/src/traces/tests/mock_otel_collector.rs +++ /dev/null @@ -1,96 +0,0 @@ -//! Mock OTEL Collector for testing trace output -//! -//! This module provides a simple HTTP server that mimics an OTEL collector. -//! It exposes three endpoints: -//! - POST /v1/traces: Capture incoming OTLP JSON payloads -//! - GET /v1/traces: Return all captured payloads as JSON array -//! - DELETE /v1/traces: Clear all captured payloads -//! -//! Each test creates its own MockOtelCollector instance. - -use axum::{ - extract::State, - http::StatusCode, - routing::{delete, get, post}, - Json, Router, -}; -use serde_json::Value; -use std::sync::Arc; -use tokio::sync::RwLock; - -type SharedTraces = Arc>>; - -/// POST /v1/traces - capture incoming OTLP payload -async fn post_traces(State(traces): State, Json(payload): Json) -> StatusCode { - traces.write().await.push(payload); - StatusCode::OK -} - -/// GET /v1/traces - return all captured payloads -async fn get_traces(State(traces): State) -> Json> { - Json(traces.read().await.clone()) -} - -/// DELETE /v1/traces - clear all captured payloads -async fn delete_traces(State(traces): State) -> StatusCode { - traces.write().await.clear(); - StatusCode::NO_CONTENT -} - -/// Mock OTEL collector server -pub struct MockOtelCollector { - address: String, - client: reqwest::Client, - #[allow(dead_code)] - server_handle: tokio::task::JoinHandle<()>, -} - -impl MockOtelCollector { - /// Create and start a new mock collector on a random port - pub async fn start() -> Self { - let traces = Arc::new(RwLock::new(Vec::new())); - - let app = Router::new() - .route("/v1/traces", post(post_traces)) - .route("/v1/traces", get(get_traces)) - .route("/v1/traces", delete(delete_traces)) - .with_state(traces.clone()); - - let listener = tokio::net::TcpListener::bind("127.0.0.1:0") - .await - .expect("Failed to bind to random port"); - - let addr = listener.local_addr().expect("Failed to get local address"); - let address = format!("http://127.0.0.1:{}", addr.port()); - - let server_handle = tokio::spawn(async move { - axum::serve(listener, app).await.expect("Server failed"); - }); - - // Give server a moment to start - tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; - - Self { - address, - client: reqwest::Client::new(), - server_handle, - } - } - - /// Get the address of the collector - pub fn address(&self) -> &str { - &self.address - } - - /// GET /v1/traces - fetch all captured payloads - pub async fn get_traces(&self) -> Vec { - self.client - .get(format!("{}/v1/traces", self.address)) - .send() - .await - .expect("Failed to GET traces") - .json() - .await - .expect("Failed to parse traces JSON") - } -} diff --git a/crates/common/src/traces/tests/mod.rs b/crates/common/src/traces/tests/mod.rs deleted file mode 100644 index 7bba42f8..00000000 --- a/crates/common/src/traces/tests/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod mock_otel_collector; -mod trace_integration_test; - -pub use mock_otel_collector::MockOtelCollector; diff --git a/crates/common/src/traces/tests/trace_integration_test.rs b/crates/common/src/traces/tests/trace_integration_test.rs deleted file mode 100644 index 5f41a2c3..00000000 --- a/crates/common/src/traces/tests/trace_integration_test.rs +++ /dev/null @@ -1,342 +0,0 @@ -//! Integration tests for OpenTelemetry tracing in router.rs -//! -//! These tests validate that the spans created for LLM requests contain -//! all expected attributes and events by checking the raw JSON payloads -//! sent to the mock OTEL collector. -//! -//! ## Test Design -//! Each test creates its own MockOtelCollector and TraceCollector: -//! 1. Start MockOtelCollector on random port -//! 2. Create TraceCollector with 500ms flush interval -//! 3. Record spans using TraceCollector -//! 4. Flush and wait (500ms + 200ms buffer = 700ms total) for spans to arrive -//! 5. Get raw JSON payloads (GET /v1/traces) and validate structure -//! 6. Test cleanup happens automatically when collectors are dropped -//! -//! ## Serial Execution -//! Tests use the `#[serial]` attribute to run sequentially because they -//! use global environment variables (OTEL_COLLECTOR_URL, OTEL_TRACING_ENABLED, -//! TRACE_FLUSH_INTERVAL_MS). This ensures test isolation without requiring -//! the `--test-threads=1` command line flag. - -const FLUSH_INTERVAL_MS: u64 = 50; -const FLUSH_BUFFER_MS: u64 = 50; -const TOTAL_WAIT_MS: u64 = FLUSH_INTERVAL_MS + FLUSH_BUFFER_MS; - -use crate::traces::{SpanBuilder, SpanKind, TraceCollector}; -use serde_json::Value; -use serial_test::serial; -use std::sync::Arc; - -use super::MockOtelCollector; - -/// Helper to extract all spans from OTLP JSON payloads -fn extract_spans(payloads: &[Value]) -> Vec<&Value> { - let mut spans = Vec::new(); - for payload in payloads { - if let Some(resource_spans) = payload.get("resourceSpans").and_then(|v| v.as_array()) { - for resource_span in resource_spans { - if let Some(scope_spans) = - resource_span.get("scopeSpans").and_then(|v| v.as_array()) - { - for scope_span in scope_spans { - if let Some(span_list) = scope_span.get("spans").and_then(|v| v.as_array()) - { - spans.extend(span_list.iter()); - } - } - } - } - } - } - spans -} - -/// Helper to get string attribute value from a span -fn get_string_attr<'a>(span: &'a Value, key: &str) -> Option<&'a str> { - span.get("attributes") - .and_then(|attrs| attrs.as_array()) - .and_then(|attrs| { - attrs - .iter() - .find(|attr| attr.get("key").and_then(|k| k.as_str()) == Some(key)) - }) - .and_then(|attr| attr.get("value")) - .and_then(|v| v.get("stringValue")) - .and_then(|v| v.as_str()) -} - -#[tokio::test] -#[serial] -async fn test_llm_span_contains_basic_attributes() { - // Start mock OTEL collector - let mock_collector = MockOtelCollector::start().await; - - // Create TraceCollector pointing to mock with 500ms flush intervalc - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - // Create a test span simulating router.rs behavior - let span = SpanBuilder::new("POST /v1/chat/completions >> /v1/chat/completions") - .with_kind(SpanKind::Client) - .with_trace_id("test-trace-123") - .with_attribute("http.method", "POST") - .with_attribute("http.target", "/v1/chat/completions") - .with_attribute("http.upstream_target", "/v1/chat/completions") - .with_attribute("llm.model", "gpt-4o") - .with_attribute("llm.provider", "openai") - .with_attribute("llm.is_streaming", "true") - .with_attribute("llm.temperature", "0.7") - .build(); - - trace_collector.record_span("archgw(llm)", span); - - // Flush and wait for spans to arrive (500ms flush interval + 200ms buffer) - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let spans = extract_spans(&payloads); - - assert_eq!(spans.len(), 1, "Expected exactly one span"); - - let span = spans[0]; - // Validate HTTP attributes - assert_eq!(get_string_attr(span, "http.method"), Some("POST")); - assert_eq!( - get_string_attr(span, "http.target"), - Some("/v1/chat/completions") - ); - - // Validate LLM attributes - assert_eq!(get_string_attr(span, "llm.model"), Some("gpt-4o")); - assert_eq!(get_string_attr(span, "llm.provider"), Some("openai")); - assert_eq!(get_string_attr(span, "llm.is_streaming"), Some("true")); - assert_eq!(get_string_attr(span, "llm.temperature"), Some("0.7")); -} - -#[tokio::test] -#[serial] -async fn test_llm_span_contains_tool_information() { - let mock_collector = MockOtelCollector::start().await; - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - let tools_formatted = "get_weather(...)\nsearch_web(...)\ncalculate(...)"; - - let span = SpanBuilder::new("POST /v1/chat/completions") - .with_trace_id("test-trace-tools") - .with_attribute("llm.request.tools", tools_formatted) - .with_attribute("llm.model", "gpt-4o") - .build(); - - trace_collector.record_span("archgw(llm)", span); - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let spans = extract_spans(&payloads); - - assert!(!spans.is_empty(), "No spans captured"); - - let span = spans[0]; - let tools = get_string_attr(span, "llm.request.tools"); - - assert!(tools.is_some(), "Tools attribute missing"); - assert!(tools.unwrap().contains("get_weather(...)")); - assert!(tools.unwrap().contains("search_web(...)")); - assert!(tools.unwrap().contains("calculate(...)")); - assert!( - tools.unwrap().contains('\n'), - "Tools should be newline-separated" - ); -} - -#[tokio::test] -#[serial] -async fn test_llm_span_contains_user_message_preview() { - let mock_collector = MockOtelCollector::start().await; - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - let long_message = - "This is a very long user message that should be truncated to 50 characters in the span"; - let preview = if long_message.len() > 50 { - format!("{}...", &long_message[..50]) - } else { - long_message.to_string() - }; - - let span = SpanBuilder::new("POST /v1/messages") - .with_trace_id("test-trace-preview") - .with_attribute("llm.request.user_message_preview", &preview) - .build(); - - trace_collector.record_span("archgw(llm)", span); - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let spans = extract_spans(&payloads); - let span = spans[0]; - - let message_preview = get_string_attr(span, "llm.request.user_message_preview"); - - assert!(message_preview.is_some()); - assert!(message_preview.unwrap().len() <= 53); // 50 chars + "..." - assert!(message_preview.unwrap().contains("...")); -} - -#[tokio::test] -#[serial] -async fn test_llm_span_contains_time_to_first_token() { - let mock_collector = MockOtelCollector::start().await; - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - let ttft_ms = "245"; // milliseconds as string - - let span = SpanBuilder::new("POST /v1/chat/completions") - .with_trace_id("test-trace-ttft") - .with_attribute("llm.is_streaming", "true") - .with_attribute("llm.time_to_first_token_ms", ttft_ms) - .build(); - - trace_collector.record_span("archgw(llm)", span); - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let spans = extract_spans(&payloads); - let span = spans[0]; - - // Check TTFT attribute - let ttft_attr = get_string_attr(span, "llm.time_to_first_token_ms"); - assert_eq!(ttft_attr, Some("245")); -} - -#[tokio::test] -#[serial] -async fn test_llm_span_contains_upstream_path() { - let mock_collector = MockOtelCollector::start().await; - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - // Test Zhipu provider with path transformation - let span = SpanBuilder::new("POST /v1/chat/completions >> /api/paas/v4/chat/completions") - .with_trace_id("test-trace-upstream") - .with_attribute("http.upstream_target", "/api/paas/v4/chat/completions") - .with_attribute("llm.provider", "zhipu") - .with_attribute("llm.model", "glm-4") - .build(); - - trace_collector.record_span("archgw(llm)", span); - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let spans = extract_spans(&payloads); - let span = spans[0]; - - // Operation name should show the transformation - let name = span.get("name").and_then(|v| v.as_str()); - assert!(name.is_some()); - assert!( - name.unwrap().contains(">>"), - "Operation name should show path transformation" - ); - - // Check upstream target attribute - let upstream = get_string_attr(span, "http.upstream_target"); - assert_eq!(upstream, Some("/api/paas/v4/chat/completions")); -} - -#[tokio::test] -#[serial] -async fn test_llm_span_multiple_services() { - let mock_collector = MockOtelCollector::start().await; - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "true"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(true))); - - // Create spans for different services - let llm_span = SpanBuilder::new("LLM Request") - .with_trace_id("test-multi") - .with_attribute("service", "llm") - .build(); - - let routing_span = SpanBuilder::new("Routing Decision") - .with_trace_id("test-multi") - .with_attribute("service", "routing") - .build(); - - trace_collector.record_span("archgw(llm)", llm_span); - trace_collector.record_span("archgw(routing)", routing_span); - trace_collector.flush().await.expect("Failed to flush"); - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let all_spans = extract_spans(&payloads); - - assert_eq!(all_spans.len(), 2, "Should have captured both spans"); -} - -#[tokio::test] -#[serial] -async fn test_tracing_disabled_produces_no_spans() { - let mock_collector = MockOtelCollector::start().await; - - // Create TraceCollector with tracing DISABLED - std::env::set_var( - "OTEL_COLLECTOR_URL", - format!("{}/v1/traces", mock_collector.address()), - ); - std::env::set_var("OTEL_TRACING_ENABLED", "false"); - std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); - let trace_collector = Arc::new(TraceCollector::new(Some(false))); - - let span = SpanBuilder::new("Test Span") - .with_trace_id("test-disabled") - .build(); - - trace_collector.record_span("archgw(llm)", span); - trace_collector.flush().await.ok(); // Should be no-op when disabled - tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; - - let payloads = mock_collector.get_traces().await; - let all_spans = extract_spans(&payloads); - assert_eq!( - all_spans.len(), - 0, - "No spans should be captured when tracing is disabled" - ); -} diff --git a/demos/use_cases/llm_routing/docker-compose.yaml b/demos/use_cases/llm_routing/docker-compose.yaml index 08aa7107..f8330269 100644 --- a/demos/use_cases/llm_routing/docker-compose.yaml +++ b/demos/use_cases/llm_routing/docker-compose.yaml @@ -10,7 +10,7 @@ services: environment: - ARCH_CONFIG_PATH=/app/arch_config.yaml - OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set} - - OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces + - OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317 volumes: - ./config.yaml:/app/arch_config.yaml:ro - /etc/ssl/cert.pem:/etc/ssl/cert.pem diff --git a/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml b/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml index d76eeb75..5f49fd15 100644 --- a/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml +++ b/demos/use_cases/multi_agent_with_crewai_langchain/docker-compose.yaml @@ -11,7 +11,7 @@ services: environment: - ARCH_CONFIG_PATH=/app/arch_config.yaml - OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set} - - OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces + - OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317 volumes: - ./config.yaml:/app/arch_config.yaml:ro - /etc/ssl/cert.pem:/etc/ssl/cert.pem diff --git a/demos/use_cases/preference_based_routing/docker-compose.yaml b/demos/use_cases/preference_based_routing/docker-compose.yaml index 2bdbe8b1..f566448f 100644 --- a/demos/use_cases/preference_based_routing/docker-compose.yaml +++ b/demos/use_cases/preference_based_routing/docker-compose.yaml @@ -11,20 +11,22 @@ services: - ARCH_CONFIG_PATH=/app/arch_config.yaml - OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?ANTHROPIC_API_KEY environment variable is required but not set} - - OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces + - OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317 + - OTEL_TRACING_ENABLED=true + - RUST_LOG=debug volumes: - ./config.yaml:/app/arch_config.yaml:ro - /etc/ssl/cert.pem:/etc/ssl/cert.pem - open-web-ui: - image: dyrnq/open-webui:main - restart: always - ports: - - "8080:8080" - environment: - - DEFAULT_MODELS=gpt-4o-mini - - ENABLE_OPENAI_API=true - - OPENAI_API_BASE_URL=http://host.docker.internal:12000/v1 + # open-web-ui: + # image: dyrnq/open-webui:main + # restart: always + # ports: + # - "8080:8080" + # environment: + # - DEFAULT_MODELS=gpt-4o-mini + # - ENABLE_OPENAI_API=true + # - OPENAI_API_BASE_URL=http://host.docker.internal:12000/v1 jaeger: build: @@ -34,12 +36,12 @@ services: - "4317:4317" - "4318:4318" - prometheus: - build: - context: ../../shared/prometheus + # prometheus: + # build: + # context: ../../shared/prometheus - grafana: - build: - context: ../../shared/grafana - ports: - - "3000:3000" + # grafana: + # build: + # context: ../../shared/grafana + # ports: + # - "3000:3000" diff --git a/tests/e2e/docker-compose.yaml b/tests/e2e/docker-compose.yaml index 31dd28de..fa171051 100644 --- a/tests/e2e/docker-compose.yaml +++ b/tests/e2e/docker-compose.yaml @@ -16,4 +16,4 @@ services: - OPENAI_API_KEY=${OPENAI_API_KEY:?error} - MISTRAL_API_KEY=${MISTRAL_API_KEY:?error} - ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?error} - - OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces + - OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317 diff --git a/tests/rest/tracing.rest b/tests/rest/tracing.rest deleted file mode 100644 index e277dedd..00000000 --- a/tests/rest/tracing.rest +++ /dev/null @@ -1,31 +0,0 @@ -POST http://localhost:4318/v1/traces -Content-Type: application/json - -{ - "resourceSpans": [ - { - "resource": { - "attributes": [ - { "key": "service.name", "value": { "stringValue": "upstream-llm" } } - ] - }, - "scopeSpans": [ - { - "scope": { "name": "default", "version": "1.0", "attributes": [] }, - "spans": [ - { - "traceId": "fa8f7c410c28092faafbd7d4a2f5e742", - "spanId": "4dc43055a07410d6", - "parentSpanId": "f0acd74216a5e179", - "name": "archgw", - "startTimeUnixNano": "1731363782228270000", - "endTimeUnixNano": "1731363787843156000", - "kind": 1, - "attributes": [] - } - ] - } - ] - } - ] -}