remove unused trace collector and fix env var

This commit is contained in:
Adil Hafeez 2026-02-07 13:08:32 -08:00
parent 7eec3bc932
commit c999b36c41
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
24 changed files with 31 additions and 864 deletions

View file

@ -29,6 +29,3 @@ jobs:
- name: Run unit tests
run: cargo test --lib
- name: Run trace integration tests
run: cargo test -p common --features trace-collection traces::tests::trace_integration_test

View file

@ -25,8 +25,7 @@
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true
},
"rust-analyzer.cargo.features": ["trace-collection"]
}
},
"extensions": {
"recommendations": [

View file

@ -153,7 +153,7 @@ def up(file, path, foreground):
# Set the ARCH_CONFIG_FILE environment variable
env_stage = {
"OTEL_TRACING_HTTP_ENDPOINT": "http://host.docker.internal:4318/v1/traces",
"OTEL_TRACING_GRPC_ENDPOINT": "http://host.docker.internal:4317",
}
env = os.environ.copy()
# Remove PATH variable if present

View file

@ -21,4 +21,4 @@ services:
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY:?error}
- MISTRAL_API_KEY=${MISTRAL_API_KEY:?error}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
- OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317

View file

@ -470,50 +470,6 @@ static_resources:
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %}
- name: otel_collector_proxy
address:
socket_address:
address: 127.0.0.1
port_value: 9903
traffic_direction: OUTBOUND
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: otel_proxy
codec_type: AUTO
# access_log:
# - name: envoy.access_loggers.file
# typed_config:
# "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
# path: "/var/log/access_otel.log"
# format: |
# [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
route_config:
name: otel_route
virtual_hosts:
- name: otel_backend
domains: ["*"]
routes:
- match:
prefix: "/v1/traces"
route:
cluster: opentelemetry_collector_http
timeout: 5s
retry_policy:
retry_on: "5xx,connect-failure,refused-stream,reset"
num_retries: 3
per_try_timeout: 2s
host_selection_retry_max_attempts: 5
retriable_status_codes: [500, 502, 503, 504]
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
{% endif %}
- name: egress_traffic_llm
address:
socket_address:

View file

@ -15,7 +15,7 @@
"RUST_BACKTRACE": "1",
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/multi_agent_with_crewai_langchain/config.yaml_rendered",
// "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/preference_based_routing/config.yaml_rendered",
"OTEL_COLLECTOR_URL": "http://localhost:4317",
"OTEL_TRACING_GRPC_ENDPOINT": "http://localhost:4317",
"OTEL_TRACING_ENABLED": "true"
},
"preLaunchTask": "rust: cargo build"

2
crates/Cargo.lock generated
View file

@ -445,7 +445,6 @@ dependencies = [
"pretty_assertions",
"proxy-wasm",
"rand 0.8.5",
"reqwest",
"serde",
"serde_json",
"serde_with",
@ -454,7 +453,6 @@ dependencies = [
"thiserror 1.0.69",
"tiktoken-rs",
"tokio",
"tracing",
"url",
"urlencoding",
]

View file

@ -1,7 +1,3 @@
[workspace]
resolver = "2"
members = ["llm_gateway", "prompt_gateway", "common", "brightstaff", "hermesllm"]
[workspace.metadata.rust-analyzer]
# Enable features for better IDE support
cargo.features = ["trace-collection"]

View file

@ -8,7 +8,7 @@ async-openai = "0.30.1"
async-trait = "0.1"
bytes = "1.10.1"
chrono = "0.4"
common = { version = "0.1.0", path = "../common", features = ["trace-collection"] }
common = { version = "0.1.0", path = "../common" }
eventsource-client = "0.15.0"
eventsource-stream = "0.2.3"
flate2 = "1.0"

View file

@ -177,7 +177,7 @@ async fn handle_agent_chat_inner(
};
get_active_span(|span| {
span.update_name(format!("(orchestrator) {}", listener.name));
span.update_name(listener.name.to_string());
});
info!(listener = %listener.name, "handling request");

View file

@ -643,7 +643,7 @@ impl PipelineProcessor {
let request_url = "/v1/chat/completions";
get_active_span(|span| {
span.update_name(format!("(agent) {} {}", terminal_agent.id, request_url));
span.update_name(format!("{} {}", terminal_agent.id, request_url));
});
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();

View file

@ -85,7 +85,7 @@ pub fn init_tracer() -> &'static SdkTracerProvider {
global::set_text_map_propagator(TraceContextPropagator::new());
// Get OTEL collector URL from environment
let otel_endpoint = std::env::var("OTEL_COLLECTOR_URL")
let otel_endpoint = std::env::var("OTEL_TRACING_GRPC_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:4317".to_string());
let tracing_enabled = std::env::var("OTEL_TRACING_ENABLED")

View file

@ -21,14 +21,8 @@ url = "2.5.4"
hermesllm = { version = "0.1.0", path = "../hermesllm" }
serde_with = "3.13.0"
# Optional dependencies for trace collection (not available in WASM)
tokio = { version = "1.44", features = ["sync", "time"], optional = true }
reqwest = { version = "0.12", features = ["json"], optional = true }
tracing = { version = "0.1", optional = true }
[features]
default = []
trace-collection = ["tokio", "reqwest", "tracing"]
[dev-dependencies]
pretty_assertions = "1.4.1"

View file

@ -30,7 +30,6 @@ pub const ARCH_MODEL_PREFIX: &str = "Arch";
pub const HALLUCINATION_TEMPLATE: &str =
"It seems I'm missing some information. Could you provide the following details ";
pub const OTEL_COLLECTOR_HTTP: &str = "opentelemetry_collector_http";
pub const OTEL_POST_PATH: &str = "/v1/traces";
pub const LLM_ROUTE_HEADER: &str = "x-arch-llm-route";
pub const ENVOY_RETRY_HEADER: &str = "x-envoy-max-retries";
pub const BRIGHT_STAFF_SERVICE_NAME: &str = "brightstaff";

View file

@ -1,292 +0,0 @@
use super::resource_span_builder::ResourceSpanBuilder;
use super::shapes::Span;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{interval, Duration};
use tracing::{debug, error, warn};
/// Parse W3C traceparent header into trace_id and parent_span_id
/// Format: "00-{trace_id}-{parent_span_id}-01"
///
/// Returns (trace_id, Option<parent_span_id>)
/// - parent_span_id is None if it's all zeros (0000000000000000), indicating a root span
pub fn parse_traceparent(traceparent: &str) -> (String, Option<String>) {
let parts: Vec<&str> = traceparent.split('-').collect();
if parts.len() == 4 {
let trace_id = parts[1].to_string();
let parent_span_id = parts[2].to_string();
// If parent_span_id is all zeros, this is a root span with no parent
let parent = if parent_span_id == "0000000000000000" {
None
} else {
Some(parent_span_id)
};
(trace_id, parent)
} else {
warn!("Invalid traceparent format: {}", traceparent);
// Return empty trace ID and None for parent if parsing fails
(String::new(), None)
}
}
/// Collects and batches spans, flushing them to an OTEL collector
///
/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)")
/// maintaining its own span queue. Flushes all services together periodically.
///
/// Tracing can be enabled/disabled in two ways:
/// 1. Via arch_config.yaml: presence of `tracing` configuration section
/// 2. Via environment variable: `OTEL_TRACING_ENABLED=true/false`
///
/// When disabled, span recording and flushing are no-ops.
pub struct TraceCollector {
/// Spans grouped by service name
/// Key: service name (e.g., "archgw(routing)", "archgw(llm)")
/// Value: queue of spans for that service
spans_by_service: Arc<Mutex<HashMap<String, VecDeque<Span>>>>,
flush_interval: Duration,
otel_url: String,
/// Whether tracing is enabled
enabled: bool,
}
impl TraceCollector {
/// Create a new trace collector
///
/// # Arguments
/// * `enabled` - Whether tracing is enabled
/// - `Some(true)` - Force enable tracing
/// - `Some(false)` - Force disable tracing
/// - `None` - Check `OTEL_TRACING_ENABLED` env var (defaults to true if not set)
///
/// Other parameters are read from environment variables:
/// - `TRACE_FLUSH_INTERVAL_MS` - Flush interval in milliseconds (default: 1000)
/// - `OTEL_COLLECTOR_URL` - OTEL collector endpoint (default: http://localhost:9903/v1/traces)
pub fn new(enabled: Option<bool>) -> Self {
let flush_interval_ms = std::env::var("TRACE_FLUSH_INTERVAL_MS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1000);
let otel_url = std::env::var("OTEL_COLLECTOR_URL")
.unwrap_or_else(|_| "http://localhost:9903/v1/traces".to_string());
// Determine if tracing is enabled:
// 1. Use explicit parameter if provided
// 2. Otherwise check OTEL_TRACING_ENABLED env var
// 3. Default to false if neither is set (tracing opt-in, not opt-out)
let enabled = enabled.unwrap_or_else(|| {
std::env::var("OTEL_TRACING_ENABLED")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(false)
});
debug!(
"TraceCollector initialized: flush_interval={}ms, url={}, enabled={}",
flush_interval_ms, otel_url, enabled
);
Self {
spans_by_service: Arc::new(Mutex::new(HashMap::new())),
flush_interval: Duration::from_millis(flush_interval_ms),
otel_url,
enabled,
}
}
/// Record a span for a specific service
///
/// # Arguments
/// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)")
/// * `span` - The span to record
pub fn record_span(&self, service_name: impl Into<String>, span: Span) {
// Skip recording if tracing is disabled
if !self.enabled {
return;
}
let service_name = service_name.into();
// Use try_lock to avoid blocking in async contexts
// If the lock is held, we skip recording (telemetry shouldn't block the app)
if let Ok(mut spans_by_service) = self.spans_by_service.try_lock() {
// Get or create the queue for this service
let spans = spans_by_service
.entry(service_name)
.or_insert_with(VecDeque::new);
spans.push_back(span);
} else {
// Lock contention - skip recording this span
debug!("Skipped span recording due to lock contention");
}
// Flushing is handled by the periodic background flusher (see `start_background_flusher`).
}
/// Flush all buffered spans to the OTEL collector
/// Builds ResourceSpans for each service with spans
pub async fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Skip flushing if tracing is disabled
if !self.enabled {
return Ok(());
}
let mut spans_by_service = self.spans_by_service.lock().await;
if spans_by_service.is_empty() {
return Ok(());
}
// Snapshot and drain all services' spans
let service_batches: Vec<(String, Vec<Span>)> = spans_by_service
.iter_mut()
.filter_map(|(service_name, spans)| {
if spans.is_empty() {
None
} else {
Some((service_name.clone(), spans.drain(..).collect()))
}
})
.collect();
drop(spans_by_service); // Release lock before HTTP call
if service_batches.is_empty() {
return Ok(());
}
let total_spans: usize = service_batches.iter().map(|(_, spans)| spans.len()).sum();
debug!(
"Flushing {} spans across {} services to OTEL collector",
total_spans,
service_batches.len()
);
// Build canonical OTEL payload structure - one ResourceSpan per service
let resource_spans = self.build_resource_spans(service_batches);
match self.send_to_otel(resource_spans).await {
Ok(_) => {
debug!("Successfully flushed {} spans", total_spans);
Ok(())
}
Err(e) => {
warn!("Failed to send spans to OTEL collector: {:?}", e);
Err(e)
}
}
}
/// Build OTEL-compliant resource spans from collected spans, one ResourceSpan per service
fn build_resource_spans(
&self,
service_batches: Vec<(String, Vec<Span>)>,
) -> Vec<super::shapes::ResourceSpan> {
service_batches
.into_iter()
.map(|(service_name, spans)| {
ResourceSpanBuilder::new(&service_name)
.add_spans(spans)
.build()
})
.collect()
}
/// Send resource spans to OTEL collector
/// Serializes as {"resourceSpans": [...]} per OTEL spec
async fn send_to_otel(
&self,
resource_spans: Vec<super::shapes::ResourceSpan>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::new();
// Create OTEL payload with proper structure
let payload = serde_json::json!({
"resourceSpans": resource_spans
});
let response = client
.post(&self.otel_url)
.header("Content-Type", "application/json")
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await?;
if !response.status().is_success() {
warn!(
"OTEL collector returned non-success status: {}",
response.status()
);
return Err(format!("OTEL collector error: {}", response.status()).into());
}
Ok(())
}
/// Start a background task that periodically flushes traces
/// Returns a join handle that can be used to stop the flusher
pub fn start_background_flusher(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
let flush_interval = self.flush_interval;
tokio::spawn(async move {
let mut ticker = interval(flush_interval);
loop {
ticker.tick().await;
if let Err(e) = self.flush().await {
error!("Background trace flush failed: {:?}", e);
}
}
})
}
/// Get current number of buffered spans across all services (for testing/monitoring)
pub async fn buffered_count(&self) -> usize {
self.spans_by_service
.lock()
.await
.values()
.map(|spans| spans.len())
.sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traces::SpanBuilder;
#[tokio::test]
async fn test_collector_basic() {
let collector = TraceCollector::new(Some(true));
let span = SpanBuilder::new("test_operation")
.with_trace_id("abc123")
.build();
collector.record_span("test-service", span);
assert_eq!(collector.buffered_count().await, 1);
}
#[tokio::test]
async fn test_collector_auto_flush() {
// Since batch-triggered flush behavior was removed, record two spans and verify both are buffered
let collector = Arc::new(TraceCollector::new(Some(true)));
let span1 = SpanBuilder::new("test1").build();
let span2 = SpanBuilder::new("test2").build();
collector.record_span("test-service", span1);
collector.record_span("test-service", span2);
// With no batch-triggered flush, both spans should remain buffered
assert_eq!(collector.buffered_count().await, 2);
}
}

View file

@ -5,12 +5,6 @@ mod constants;
mod resource_span_builder;
mod span_builder;
#[cfg(feature = "trace-collection")]
mod collector;
#[cfg(all(test, feature = "trace-collection"))]
mod tests;
// Re-export original types
pub use shapes::{
Attribute, AttributeValue, Event, Resource, ResourceSpan, Scope, ScopeSpan, Span, Traceparent,
@ -21,6 +15,3 @@ pub use shapes::{
pub use constants::*;
pub use resource_span_builder::ResourceSpanBuilder;
pub use span_builder::{generate_random_span_id, SpanBuilder, SpanKind};
#[cfg(feature = "trace-collection")]
pub use collector::{parse_traceparent, TraceCollector};

View file

@ -1,96 +0,0 @@
//! Mock OTEL Collector for testing trace output
//!
//! This module provides a simple HTTP server that mimics an OTEL collector.
//! It exposes three endpoints:
//! - POST /v1/traces: Capture incoming OTLP JSON payloads
//! - GET /v1/traces: Return all captured payloads as JSON array
//! - DELETE /v1/traces: Clear all captured payloads
//!
//! Each test creates its own MockOtelCollector instance.
use axum::{
extract::State,
http::StatusCode,
routing::{delete, get, post},
Json, Router,
};
use serde_json::Value;
use std::sync::Arc;
use tokio::sync::RwLock;
type SharedTraces = Arc<RwLock<Vec<Value>>>;
/// POST /v1/traces - capture incoming OTLP payload
async fn post_traces(State(traces): State<SharedTraces>, Json(payload): Json<Value>) -> StatusCode {
traces.write().await.push(payload);
StatusCode::OK
}
/// GET /v1/traces - return all captured payloads
async fn get_traces(State(traces): State<SharedTraces>) -> Json<Vec<Value>> {
Json(traces.read().await.clone())
}
/// DELETE /v1/traces - clear all captured payloads
async fn delete_traces(State(traces): State<SharedTraces>) -> StatusCode {
traces.write().await.clear();
StatusCode::NO_CONTENT
}
/// Mock OTEL collector server
pub struct MockOtelCollector {
address: String,
client: reqwest::Client,
#[allow(dead_code)]
server_handle: tokio::task::JoinHandle<()>,
}
impl MockOtelCollector {
/// Create and start a new mock collector on a random port
pub async fn start() -> Self {
let traces = Arc::new(RwLock::new(Vec::new()));
let app = Router::new()
.route("/v1/traces", post(post_traces))
.route("/v1/traces", get(get_traces))
.route("/v1/traces", delete(delete_traces))
.with_state(traces.clone());
let listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("Failed to bind to random port");
let addr = listener.local_addr().expect("Failed to get local address");
let address = format!("http://127.0.0.1:{}", addr.port());
let server_handle = tokio::spawn(async move {
axum::serve(listener, app).await.expect("Server failed");
});
// Give server a moment to start
tokio::time::sleep(tokio::time::Duration::from_millis(50)).await;
Self {
address,
client: reqwest::Client::new(),
server_handle,
}
}
/// Get the address of the collector
pub fn address(&self) -> &str {
&self.address
}
/// GET /v1/traces - fetch all captured payloads
pub async fn get_traces(&self) -> Vec<Value> {
self.client
.get(format!("{}/v1/traces", self.address))
.send()
.await
.expect("Failed to GET traces")
.json()
.await
.expect("Failed to parse traces JSON")
}
}

View file

@ -1,4 +0,0 @@
mod mock_otel_collector;
mod trace_integration_test;
pub use mock_otel_collector::MockOtelCollector;

View file

@ -1,342 +0,0 @@
//! Integration tests for OpenTelemetry tracing in router.rs
//!
//! These tests validate that the spans created for LLM requests contain
//! all expected attributes and events by checking the raw JSON payloads
//! sent to the mock OTEL collector.
//!
//! ## Test Design
//! Each test creates its own MockOtelCollector and TraceCollector:
//! 1. Start MockOtelCollector on random port
//! 2. Create TraceCollector with 500ms flush interval
//! 3. Record spans using TraceCollector
//! 4. Flush and wait (500ms + 200ms buffer = 700ms total) for spans to arrive
//! 5. Get raw JSON payloads (GET /v1/traces) and validate structure
//! 6. Test cleanup happens automatically when collectors are dropped
//!
//! ## Serial Execution
//! Tests use the `#[serial]` attribute to run sequentially because they
//! use global environment variables (OTEL_COLLECTOR_URL, OTEL_TRACING_ENABLED,
//! TRACE_FLUSH_INTERVAL_MS). This ensures test isolation without requiring
//! the `--test-threads=1` command line flag.
const FLUSH_INTERVAL_MS: u64 = 50;
const FLUSH_BUFFER_MS: u64 = 50;
const TOTAL_WAIT_MS: u64 = FLUSH_INTERVAL_MS + FLUSH_BUFFER_MS;
use crate::traces::{SpanBuilder, SpanKind, TraceCollector};
use serde_json::Value;
use serial_test::serial;
use std::sync::Arc;
use super::MockOtelCollector;
/// Helper to extract all spans from OTLP JSON payloads
fn extract_spans(payloads: &[Value]) -> Vec<&Value> {
let mut spans = Vec::new();
for payload in payloads {
if let Some(resource_spans) = payload.get("resourceSpans").and_then(|v| v.as_array()) {
for resource_span in resource_spans {
if let Some(scope_spans) =
resource_span.get("scopeSpans").and_then(|v| v.as_array())
{
for scope_span in scope_spans {
if let Some(span_list) = scope_span.get("spans").and_then(|v| v.as_array())
{
spans.extend(span_list.iter());
}
}
}
}
}
}
spans
}
/// Helper to get string attribute value from a span
fn get_string_attr<'a>(span: &'a Value, key: &str) -> Option<&'a str> {
span.get("attributes")
.and_then(|attrs| attrs.as_array())
.and_then(|attrs| {
attrs
.iter()
.find(|attr| attr.get("key").and_then(|k| k.as_str()) == Some(key))
})
.and_then(|attr| attr.get("value"))
.and_then(|v| v.get("stringValue"))
.and_then(|v| v.as_str())
}
#[tokio::test]
#[serial]
async fn test_llm_span_contains_basic_attributes() {
// Start mock OTEL collector
let mock_collector = MockOtelCollector::start().await;
// Create TraceCollector pointing to mock with 500ms flush intervalc
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
// Create a test span simulating router.rs behavior
let span = SpanBuilder::new("POST /v1/chat/completions >> /v1/chat/completions")
.with_kind(SpanKind::Client)
.with_trace_id("test-trace-123")
.with_attribute("http.method", "POST")
.with_attribute("http.target", "/v1/chat/completions")
.with_attribute("http.upstream_target", "/v1/chat/completions")
.with_attribute("llm.model", "gpt-4o")
.with_attribute("llm.provider", "openai")
.with_attribute("llm.is_streaming", "true")
.with_attribute("llm.temperature", "0.7")
.build();
trace_collector.record_span("archgw(llm)", span);
// Flush and wait for spans to arrive (500ms flush interval + 200ms buffer)
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let spans = extract_spans(&payloads);
assert_eq!(spans.len(), 1, "Expected exactly one span");
let span = spans[0];
// Validate HTTP attributes
assert_eq!(get_string_attr(span, "http.method"), Some("POST"));
assert_eq!(
get_string_attr(span, "http.target"),
Some("/v1/chat/completions")
);
// Validate LLM attributes
assert_eq!(get_string_attr(span, "llm.model"), Some("gpt-4o"));
assert_eq!(get_string_attr(span, "llm.provider"), Some("openai"));
assert_eq!(get_string_attr(span, "llm.is_streaming"), Some("true"));
assert_eq!(get_string_attr(span, "llm.temperature"), Some("0.7"));
}
#[tokio::test]
#[serial]
async fn test_llm_span_contains_tool_information() {
let mock_collector = MockOtelCollector::start().await;
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
let tools_formatted = "get_weather(...)\nsearch_web(...)\ncalculate(...)";
let span = SpanBuilder::new("POST /v1/chat/completions")
.with_trace_id("test-trace-tools")
.with_attribute("llm.request.tools", tools_formatted)
.with_attribute("llm.model", "gpt-4o")
.build();
trace_collector.record_span("archgw(llm)", span);
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let spans = extract_spans(&payloads);
assert!(!spans.is_empty(), "No spans captured");
let span = spans[0];
let tools = get_string_attr(span, "llm.request.tools");
assert!(tools.is_some(), "Tools attribute missing");
assert!(tools.unwrap().contains("get_weather(...)"));
assert!(tools.unwrap().contains("search_web(...)"));
assert!(tools.unwrap().contains("calculate(...)"));
assert!(
tools.unwrap().contains('\n'),
"Tools should be newline-separated"
);
}
#[tokio::test]
#[serial]
async fn test_llm_span_contains_user_message_preview() {
let mock_collector = MockOtelCollector::start().await;
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
let long_message =
"This is a very long user message that should be truncated to 50 characters in the span";
let preview = if long_message.len() > 50 {
format!("{}...", &long_message[..50])
} else {
long_message.to_string()
};
let span = SpanBuilder::new("POST /v1/messages")
.with_trace_id("test-trace-preview")
.with_attribute("llm.request.user_message_preview", &preview)
.build();
trace_collector.record_span("archgw(llm)", span);
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let spans = extract_spans(&payloads);
let span = spans[0];
let message_preview = get_string_attr(span, "llm.request.user_message_preview");
assert!(message_preview.is_some());
assert!(message_preview.unwrap().len() <= 53); // 50 chars + "..."
assert!(message_preview.unwrap().contains("..."));
}
#[tokio::test]
#[serial]
async fn test_llm_span_contains_time_to_first_token() {
let mock_collector = MockOtelCollector::start().await;
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
let ttft_ms = "245"; // milliseconds as string
let span = SpanBuilder::new("POST /v1/chat/completions")
.with_trace_id("test-trace-ttft")
.with_attribute("llm.is_streaming", "true")
.with_attribute("llm.time_to_first_token_ms", ttft_ms)
.build();
trace_collector.record_span("archgw(llm)", span);
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let spans = extract_spans(&payloads);
let span = spans[0];
// Check TTFT attribute
let ttft_attr = get_string_attr(span, "llm.time_to_first_token_ms");
assert_eq!(ttft_attr, Some("245"));
}
#[tokio::test]
#[serial]
async fn test_llm_span_contains_upstream_path() {
let mock_collector = MockOtelCollector::start().await;
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
// Test Zhipu provider with path transformation
let span = SpanBuilder::new("POST /v1/chat/completions >> /api/paas/v4/chat/completions")
.with_trace_id("test-trace-upstream")
.with_attribute("http.upstream_target", "/api/paas/v4/chat/completions")
.with_attribute("llm.provider", "zhipu")
.with_attribute("llm.model", "glm-4")
.build();
trace_collector.record_span("archgw(llm)", span);
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let spans = extract_spans(&payloads);
let span = spans[0];
// Operation name should show the transformation
let name = span.get("name").and_then(|v| v.as_str());
assert!(name.is_some());
assert!(
name.unwrap().contains(">>"),
"Operation name should show path transformation"
);
// Check upstream target attribute
let upstream = get_string_attr(span, "http.upstream_target");
assert_eq!(upstream, Some("/api/paas/v4/chat/completions"));
}
#[tokio::test]
#[serial]
async fn test_llm_span_multiple_services() {
let mock_collector = MockOtelCollector::start().await;
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "true");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(true)));
// Create spans for different services
let llm_span = SpanBuilder::new("LLM Request")
.with_trace_id("test-multi")
.with_attribute("service", "llm")
.build();
let routing_span = SpanBuilder::new("Routing Decision")
.with_trace_id("test-multi")
.with_attribute("service", "routing")
.build();
trace_collector.record_span("archgw(llm)", llm_span);
trace_collector.record_span("archgw(routing)", routing_span);
trace_collector.flush().await.expect("Failed to flush");
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let all_spans = extract_spans(&payloads);
assert_eq!(all_spans.len(), 2, "Should have captured both spans");
}
#[tokio::test]
#[serial]
async fn test_tracing_disabled_produces_no_spans() {
let mock_collector = MockOtelCollector::start().await;
// Create TraceCollector with tracing DISABLED
std::env::set_var(
"OTEL_COLLECTOR_URL",
format!("{}/v1/traces", mock_collector.address()),
);
std::env::set_var("OTEL_TRACING_ENABLED", "false");
std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500");
let trace_collector = Arc::new(TraceCollector::new(Some(false)));
let span = SpanBuilder::new("Test Span")
.with_trace_id("test-disabled")
.build();
trace_collector.record_span("archgw(llm)", span);
trace_collector.flush().await.ok(); // Should be no-op when disabled
tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await;
let payloads = mock_collector.get_traces().await;
let all_spans = extract_spans(&payloads);
assert_eq!(
all_spans.len(),
0,
"No spans should be captured when tracing is disabled"
);
}

View file

@ -10,7 +10,7 @@ services:
environment:
- ARCH_CONFIG_PATH=/app/arch_config.yaml
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
- OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317
volumes:
- ./config.yaml:/app/arch_config.yaml:ro
- /etc/ssl/cert.pem:/etc/ssl/cert.pem

View file

@ -11,7 +11,7 @@ services:
environment:
- ARCH_CONFIG_PATH=/app/arch_config.yaml
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
- OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317
volumes:
- ./config.yaml:/app/arch_config.yaml:ro
- /etc/ssl/cert.pem:/etc/ssl/cert.pem

View file

@ -11,20 +11,22 @@ services:
- ARCH_CONFIG_PATH=/app/arch_config.yaml
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?ANTHROPIC_API_KEY environment variable is required but not set}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
- OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317
- OTEL_TRACING_ENABLED=true
- RUST_LOG=debug
volumes:
- ./config.yaml:/app/arch_config.yaml:ro
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
open-web-ui:
image: dyrnq/open-webui:main
restart: always
ports:
- "8080:8080"
environment:
- DEFAULT_MODELS=gpt-4o-mini
- ENABLE_OPENAI_API=true
- OPENAI_API_BASE_URL=http://host.docker.internal:12000/v1
# open-web-ui:
# image: dyrnq/open-webui:main
# restart: always
# ports:
# - "8080:8080"
# environment:
# - DEFAULT_MODELS=gpt-4o-mini
# - ENABLE_OPENAI_API=true
# - OPENAI_API_BASE_URL=http://host.docker.internal:12000/v1
jaeger:
build:
@ -34,12 +36,12 @@ services:
- "4317:4317"
- "4318:4318"
prometheus:
build:
context: ../../shared/prometheus
# prometheus:
# build:
# context: ../../shared/prometheus
grafana:
build:
context: ../../shared/grafana
ports:
- "3000:3000"
# grafana:
# build:
# context: ../../shared/grafana
# ports:
# - "3000:3000"

View file

@ -16,4 +16,4 @@ services:
- OPENAI_API_KEY=${OPENAI_API_KEY:?error}
- MISTRAL_API_KEY=${MISTRAL_API_KEY:?error}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?error}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
- OTEL_TRACING_GRPC_ENDPOINT=http://host.docker.internal:4317

View file

@ -1,31 +0,0 @@
POST http://localhost:4318/v1/traces
Content-Type: application/json
{
"resourceSpans": [
{
"resource": {
"attributes": [
{ "key": "service.name", "value": { "stringValue": "upstream-llm" } }
]
},
"scopeSpans": [
{
"scope": { "name": "default", "version": "1.0", "attributes": [] },
"spans": [
{
"traceId": "fa8f7c410c28092faafbd7d4a2f5e742",
"spanId": "4dc43055a07410d6",
"parentSpanId": "f0acd74216a5e179",
"name": "archgw",
"startTimeUnixNano": "1731363782228270000",
"endTimeUnixNano": "1731363787843156000",
"kind": 1,
"attributes": []
}
]
}
]
}
]
}