adding canonical tracing support via bright-staff

This commit is contained in:
Salman Paracha 2025-12-09 17:45:22 -08:00
parent 09c0b999b2
commit c748ea0a71
27 changed files with 1803 additions and 273 deletions

View file

@ -11,4 +11,5 @@ pub mod routing;
pub mod stats;
pub mod tokenizer;
pub mod tracing;
pub mod traces;
pub mod utils;

View file

@ -0,0 +1,259 @@
use super::shapes::Span;
use super::resource_span_builder::ResourceSpanBuilder;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{interval, Duration};
use tracing::{debug, error, warn};
/// Parse W3C traceparent header into trace_id and parent_span_id
/// Format: "00-{trace_id}-{parent_span_id}-01"
///
/// Returns (trace_id, parent_span_id) as strings
pub fn parse_traceparent(traceparent: &str) -> (String, String) {
let parts: Vec<&str> = traceparent.split('-').collect();
if parts.len() == 4 {
(parts[1].to_string(), parts[2].to_string())
} else {
warn!("Invalid traceparent format: {}", traceparent);
// Generate empty IDs if parsing fails
(String::new(), String::new())
}
}
/// Collects and batches spans, flushing them to an OTEL collector
///
/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)")
/// maintaining its own span queue. Flushes all services together periodically.
pub struct TraceCollector {
/// Spans grouped by service name
/// Key: service name (e.g., "archgw(routing)", "archgw(llm)")
/// Value: queue of spans for that service
spans_by_service: Arc<Mutex<HashMap<String, VecDeque<Span>>>>,
flush_interval: Duration,
otel_url: String,
}
impl TraceCollector {
/// Create a new trace collector
/// # Arguments
/// * `flush_interval` - How often to flush buffered spans
/// * `otel_url` - OTEL collector endpoint URL
pub fn new(
flush_interval: Duration,
otel_url: String,
) -> Self {
Self {
spans_by_service: Arc::new(Mutex::new(HashMap::new())),
flush_interval,
otel_url,
}
}
/// Create with defaults from environment or sensible defaults
pub fn from_env() -> Self {
let batch_size = std::env::var("TRACE_BATCH_SIZE")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(100);
let flush_interval_secs = std::env::var("TRACE_FLUSH_INTERVAL_SECS")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(1);
let otel_url = std::env::var("OTEL_COLLECTOR_URL")
.unwrap_or_else(|_| "http://host.docker.internal:4318/v1/traces".to_string());
debug!(
"TraceCollector initialized: batch_size={}, flush_interval={}s, url={}",
batch_size, flush_interval_secs, otel_url
);
Self::new(
Duration::from_secs(flush_interval_secs),
otel_url,
)
}
/// Record a span for a specific service
///
/// # Arguments
/// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)")
/// * `span` - The span to record
pub fn record_span(&self, service_name: impl Into<String>, span: Span) {
let service_name = service_name.into();
// Use try_lock to avoid blocking in async contexts
// If the lock is held, we skip recording (telemetry shouldn't block the app)
if let Ok(mut spans_by_service) = self.spans_by_service.try_lock() {
// Get or create the queue for this service
let spans = spans_by_service
.entry(service_name)
.or_insert_with(VecDeque::new);
spans.push_back(span);
} else {
// Lock contention - skip recording this span
debug!("Skipped span recording due to lock contention");
}
// Flushing is handled by the periodic background flusher (see `start_background_flusher`).
}
/// Flush all buffered spans to the OTEL collector
/// Builds ResourceSpans for each service with spans
pub async fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let mut spans_by_service = self.spans_by_service.lock().await;
if spans_by_service.is_empty() {
return Ok(());
}
// Snapshot and drain all services' spans
let service_batches: Vec<(String, Vec<Span>)> = spans_by_service
.iter_mut()
.filter_map(|(service_name, spans)| {
if spans.is_empty() {
None
} else {
Some((service_name.clone(), spans.drain(..).collect()))
}
})
.collect();
drop(spans_by_service); // Release lock before HTTP call
if service_batches.is_empty() {
return Ok(());
}
let total_spans: usize = service_batches.iter().map(|(_, spans)| spans.len()).sum();
debug!("Flushing {} spans across {} services to OTEL collector", total_spans, service_batches.len());
// Build canonical OTEL payload structure - one ResourceSpan per service
let resource_spans = self.build_resource_spans(service_batches);
match self.send_to_otel(resource_spans).await {
Ok(_) => {
debug!("Successfully flushed {} spans", total_spans);
Ok(())
}
Err(e) => {
warn!("Failed to send spans to OTEL collector: {:?}", e);
Err(e)
}
}
}
/// Build OTEL-compliant resource spans from collected spans, one ResourceSpan per service
fn build_resource_spans(&self, service_batches: Vec<(String, Vec<Span>)>) -> Vec<super::shapes::ResourceSpan> {
service_batches
.into_iter()
.map(|(service_name, spans)| {
ResourceSpanBuilder::new(&service_name)
.add_spans(spans)
.build()
})
.collect()
}
/// Send resource spans to OTEL collector
/// Serializes as {"resourceSpans": [...]} per OTEL spec
async fn send_to_otel(
&self,
resource_spans: Vec<super::shapes::ResourceSpan>,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let client = reqwest::Client::new();
// Create OTEL payload with proper structure
let payload = serde_json::json!({
"resourceSpans": resource_spans
});
let response = client
.post(&self.otel_url)
.header("Content-Type", "application/json")
.json(&payload)
.timeout(Duration::from_secs(5))
.send()
.await?;
if !response.status().is_success() {
warn!(
"OTEL collector returned non-success status: {}",
response.status()
);
return Err(format!("OTEL collector error: {}", response.status()).into());
}
Ok(())
}
/// Start a background task that periodically flushes traces
/// Returns a join handle that can be used to stop the flusher
pub fn start_background_flusher(self: Arc<Self>) -> tokio::task::JoinHandle<()> {
let flush_interval = self.flush_interval;
tokio::spawn(async move {
let mut ticker = interval(flush_interval);
loop {
ticker.tick().await;
if let Err(e) = self.flush().await {
error!("Background trace flush failed: {:?}", e);
}
}
})
}
/// Get current number of buffered spans across all services (for testing/monitoring)
pub async fn buffered_count(&self) -> usize {
self.spans_by_service
.lock()
.await
.values()
.map(|spans| spans.len())
.sum()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traces::SpanBuilder;
#[tokio::test]
async fn test_collector_basic() {
let collector = TraceCollector::new(
Duration::from_secs(60),
"http://test:4318/v1/traces".to_string(),
);
let span = SpanBuilder::new("test_operation")
.with_trace_id("abc123")
.build();
collector.record_span("test-service", span);
assert_eq!(collector.buffered_count().await, 1);
}
#[tokio::test]
async fn test_collector_auto_flush() {
// Since batch-triggered flush behavior was removed, record two spans and verify both are buffered
let collector = Arc::new(TraceCollector::new(
Duration::from_secs(60),
"http://test:4318/v1/traces".to_string(),
));
let span1 = SpanBuilder::new("test1").build();
let span2 = SpanBuilder::new("test2").build();
collector.record_span("test-service", span1);
collector.record_span("test-service", span2);
// With no batch-triggered flush, both spans should remain buffered
assert_eq!(collector.buffered_count().await, 2);
}
}

View file

@ -0,0 +1,27 @@
/// OpenTelemetry semantic convention constants for tracing
///
/// These constants ensure consistency across the codebase and prevent typos
/// Resource attribute keys following OTEL semantic conventions
pub mod resource {
/// Logical name of the service
pub const SERVICE_NAME: &str = "service.name";
/// Version of the service
pub const SERVICE_VERSION: &str = "service.version";
/// Service namespace/environment
pub const SERVICE_NAMESPACE: &str = "service.namespace";
/// Service instance ID
pub const SERVICE_INSTANCE_ID: &str = "service.instance.id";
}
/// Instrumentation scope defaults
pub mod scope {
/// Default scope name for tracing instrumentation
pub const DEFAULT_NAME: &str = "brightstaff.tracing";
/// Default scope version
pub const DEFAULT_VERSION: &str = "1.0.0";
}

View file

@ -0,0 +1,23 @@
// Original tracing types (OTEL structures)
mod shapes;
// New tracing utilities
mod span_builder;
mod resource_span_builder;
mod constants;
#[cfg(feature = "trace-collection")]
mod collector;
// Re-export original types
pub use shapes::{
Span, Event, Traceparent, TraceparentNewError,
ResourceSpan, Resource, ScopeSpan, Scope, Attribute, AttributeValue,
};
// Re-export new utilities
pub use span_builder::{SpanBuilder, SpanKind};
pub use resource_span_builder::ResourceSpanBuilder;
pub use constants::*;
#[cfg(feature = "trace-collection")]
pub use collector::{TraceCollector, parse_traceparent};

View file

@ -0,0 +1,121 @@
use super::shapes::{ResourceSpan, Resource, ScopeSpan, Scope, Span, Attribute, AttributeValue};
use super::constants::{resource, scope};
use std::collections::HashMap;
/// Builder for creating OTEL ResourceSpan structures
///
/// Provides a fluent API for building the resource/scope/span hierarchy
pub struct ResourceSpanBuilder {
service_name: String,
resource_attributes: HashMap<String, String>,
scope_name: String,
scope_version: String,
spans: Vec<Span>,
}
impl ResourceSpanBuilder {
/// Create a new ResourceSpan builder with service name
pub fn new(service_name: impl Into<String>) -> Self {
Self {
service_name: service_name.into(),
resource_attributes: HashMap::new(),
scope_name: scope::DEFAULT_NAME.to_string(),
scope_version: scope::DEFAULT_VERSION.to_string(),
spans: Vec::new(),
}
}
/// Add a resource attribute (e.g., deployment.environment, host.name)
pub fn with_resource_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.resource_attributes.insert(key.into(), value.into());
self
}
/// Set the instrumentation scope name
pub fn with_scope_name(mut self, name: impl Into<String>) -> Self {
self.scope_name = name.into();
self
}
/// Set the instrumentation scope version
pub fn with_scope_version(mut self, version: impl Into<String>) -> Self {
self.scope_version = version.into();
self
}
/// Add a single span
pub fn add_span(mut self, span: Span) -> Self {
self.spans.push(span);
self
}
/// Add multiple spans
pub fn add_spans(mut self, spans: Vec<Span>) -> Self {
self.spans.extend(spans);
self
}
/// Build the ResourceSpan
pub fn build(self) -> ResourceSpan {
// Build resource attributes
let mut attributes = vec![
Attribute {
key: resource::SERVICE_NAME.to_string(),
value: AttributeValue {
string_value: Some(self.service_name),
},
}
];
// Add custom resource attributes
for (key, value) in self.resource_attributes {
attributes.push(Attribute {
key,
value: AttributeValue {
string_value: Some(value),
},
});
}
let resource = Resource { attributes };
let scope = Scope {
name: self.scope_name,
version: self.scope_version,
attributes: Vec::new(),
};
let scope_span = ScopeSpan {
scope,
spans: self.spans,
};
ResourceSpan {
resource,
scope_spans: vec![scope_span],
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::traces::SpanBuilder;
#[test]
fn test_resource_span_builder() {
let span1 = SpanBuilder::new("operation1").build();
let span2 = SpanBuilder::new("operation2").build();
let resource_span = ResourceSpanBuilder::new("test-service")
.with_resource_attribute("deployment.environment", "production")
.with_scope_name("test-scope")
.add_span(span1)
.add_span(span2)
.build();
assert_eq!(resource_span.resource.attributes.len(), 2); // service.name + custom
assert_eq!(resource_span.scope_spans.len(), 1);
assert_eq!(resource_span.scope_spans[0].spans.len(), 2);
}
}

View file

@ -0,0 +1,123 @@
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct ResourceSpan {
pub resource: Resource,
#[serde(rename = "scopeSpans")]
pub scope_spans: Vec<ScopeSpan>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Resource {
pub attributes: Vec<Attribute>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ScopeSpan {
pub scope: Scope,
pub spans: Vec<Span>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Scope {
pub name: String,
pub version: String,
pub attributes: Vec<Attribute>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Span {
#[serde(rename = "traceId")]
pub trace_id: String,
#[serde(rename = "spanId")]
pub span_id: String,
#[serde(rename = "parentSpanId")]
pub parent_span_id: Option<String>, // Optional in case there's no parent span
pub name: String,
#[serde(rename = "startTimeUnixNano")]
pub start_time_unix_nano: String,
#[serde(rename = "endTimeUnixNano")]
pub end_time_unix_nano: String,
pub kind: u32,
pub attributes: Vec<Attribute>,
pub events: Option<Vec<Event>>,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Event {
#[serde(rename = "timeUnixNano")]
pub time_unix_nano: String,
pub name: String,
pub attributes: Vec<Attribute>,
}
impl Event {
pub fn new(name: String, time_unix_nano: u128) -> Self {
Event {
time_unix_nano: format!("{}", time_unix_nano),
name,
attributes: Vec::new(),
}
}
pub fn add_attribute(&mut self, key: String, value: String) {
self.attributes.push(Attribute {
key,
value: AttributeValue {
string_value: Some(value),
},
});
}
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct Attribute {
pub key: String,
pub value: AttributeValue,
}
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct AttributeValue {
#[serde(rename = "stringValue")]
pub string_value: Option<String>, // Use Option to handle different value types
}
pub struct Traceparent {
pub version: String,
pub trace_id: String,
pub parent_id: String,
pub flags: String,
}
impl std::fmt::Display for Traceparent {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}-{}-{}-{}",
self.version, self.trace_id, self.parent_id, self.flags
)
}
}
#[derive(thiserror::Error, Debug)]
pub enum TraceparentNewError {
#[error("Invalid traceparent: \'{0}\'")]
InvalidTraceparent(String),
}
impl TryFrom<String> for Traceparent {
type Error = TraceparentNewError;
fn try_from(traceparent: String) -> Result<Self, Self::Error> {
let traceparent_tokens: Vec<&str> = traceparent.split("-").collect::<Vec<&str>>();
if traceparent_tokens.len() != 4 {
return Err(TraceparentNewError::InvalidTraceparent(traceparent));
}
Ok(Traceparent {
version: traceparent_tokens[0].to_string(),
trace_id: traceparent_tokens[1].to_string(),
parent_id: traceparent_tokens[2].to_string(),
flags: traceparent_tokens[3].to_string(),
})
}
}

View file

@ -0,0 +1,193 @@
use super::shapes::{Span, Attribute, AttributeValue};
use std::collections::HashMap;
use std::time::SystemTime;
/// OpenTelemetry span kinds
/// https://opentelemetry.io/docs/specs/otel/trace/api/#spankind
#[derive(Debug, Clone, Copy)]
pub enum SpanKind {
/// Default value. Indicates that the span represents an internal operation within an application
Internal = 0,
/// Indicates that the span describes a request to some remote service
Client = 3,
}
/// Builder for creating OTEL-compliant spans with a fluent API
///
/// This is the recommended way to create spans with proper trace context.
///
/// # Example
/// ```no_run
/// use common::traces::{SpanBuilder, SpanKind};
/// use std::time::SystemTime;
///
/// let span = SpanBuilder::new("router_chat")
/// .with_trace_id("abc123")
/// .with_parent_span_id("parent456")
/// .with_kind(SpanKind::Internal)
/// .with_attribute("http.method", "POST")
/// .with_attribute("http.path", "/v1/chat/completions")
/// .build();
/// ```
pub struct SpanBuilder {
name: String,
trace_id: Option<String>,
parent_span_id: Option<String>,
start_time: SystemTime,
end_time: Option<SystemTime>,
kind: SpanKind,
attributes: HashMap<String, String>,
}
impl SpanBuilder {
/// Create a new span builder
///
/// # Arguments
/// * `name` - The operation name for this span (e.g., "router_chat", "determine_route")
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
trace_id: None,
parent_span_id: None,
start_time: SystemTime::now(),
end_time: None,
kind: SpanKind::Internal,
attributes: HashMap::new(),
}
}
/// Set the trace ID (extracted from traceparent or OpenTelemetry context)
pub fn with_trace_id(mut self, trace_id: impl Into<String>) -> Self {
self.trace_id = Some(trace_id.into());
self
}
/// Set the parent span ID to link this span to its parent
pub fn with_parent_span_id(mut self, parent_span_id: impl Into<String>) -> Self {
self.parent_span_id = Some(parent_span_id.into());
self
}
/// Set the span kind (defaults to Internal)
pub fn with_kind(mut self, kind: SpanKind) -> Self {
self.kind = kind;
self
}
/// Set explicit start time (defaults to now)
pub fn with_start_time(mut self, start_time: SystemTime) -> Self {
self.start_time = start_time;
self
}
/// Set explicit end time (defaults to build time)
pub fn with_end_time(mut self, end_time: SystemTime) -> Self {
self.end_time = Some(end_time);
self
}
/// Add a single attribute to the span
pub fn with_attribute(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
self.attributes.insert(key.into(), value.into());
self
}
/// Add multiple attributes at once
pub fn with_attributes(mut self, attrs: HashMap<String, String>) -> Self {
self.attributes.extend(attrs);
self
}
/// Build the span, consuming the builder
///
/// Creates a complete OTEL-compliant span with all provided attributes,
/// generating span_id and using provided or random trace_id.
pub fn build(self) -> Span {
let end_time = self.end_time.unwrap_or_else(SystemTime::now);
let start_nanos = system_time_to_nanos(self.start_time);
let end_nanos = system_time_to_nanos(end_time);
// Generate trace_id if not provided
let trace_id = self.trace_id.unwrap_or_else(|| generate_random_trace_id());
// Create attributes in OTEL format
let attributes: Vec<Attribute> = self.attributes
.into_iter()
.map(|(key, value)| Attribute {
key,
value: AttributeValue {
string_value: Some(value),
},
})
.collect();
// Build span directly without going through Span::new()
Span {
trace_id,
span_id: generate_random_span_id(),
parent_span_id: self.parent_span_id,
name: self.name,
start_time_unix_nano: format!("{}", start_nanos),
end_time_unix_nano: format!("{}", end_nanos),
kind: self.kind as u32,
attributes,
events: None,
}
}
}
/// Convert SystemTime to nanoseconds since UNIX epoch for OTEL
fn system_time_to_nanos(time: SystemTime) -> u128 {
time.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()
}
/// Generate a random span ID (16 hex characters = 8 bytes)
fn generate_random_span_id() -> String {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut random_bytes = [0u8; 8];
rng.fill_bytes(&mut random_bytes);
hex::encode(random_bytes)
}
/// Generate a random trace ID (32 hex characters = 16 bytes)
fn generate_random_trace_id() -> String {
use rand::RngCore;
let mut rng = rand::thread_rng();
let mut random_bytes = [0u8; 16];
rng.fill_bytes(&mut random_bytes);
hex::encode(random_bytes)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_span_builder_basic() {
let span = SpanBuilder::new("test_operation")
.with_trace_id("abc123")
.with_parent_span_id("parent123")
.with_attribute("key", "value")
.build();
assert_eq!(span.name, "test_operation");
assert_eq!(span.trace_id, "abc123");
assert_eq!(span.parent_span_id, Some("parent123".to_string()));
assert_eq!(span.attributes.len(), 1);
}
#[test]
fn test_span_builder_no_parent() {
let span = SpanBuilder::new("root_span")
.with_trace_id("xyz789")
.build();
assert_eq!(span.name, "root_span");
assert_eq!(span.trace_id, "xyz789");
assert_eq!(span.parent_span_id, None);
}
}