mirror of
https://github.com/katanemo/plano.git
synced 2026-06-17 15:25:17 +02:00
using Envoy to transport traces, not calling OTEL directly
This commit is contained in:
parent
0f9732358e
commit
120c923c11
4 changed files with 133 additions and 36 deletions
|
|
@ -469,6 +469,48 @@ static_resources:
|
|||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
|
||||
|
||||
- name: otel_collector_proxy
|
||||
address:
|
||||
socket_address:
|
||||
address: 0.0.0.0
|
||||
port_value: 9903
|
||||
traffic_direction: OUTBOUND
|
||||
filter_chains:
|
||||
- filters:
|
||||
- name: envoy.filters.network.http_connection_manager
|
||||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
|
||||
stat_prefix: otel_proxy
|
||||
codec_type: AUTO
|
||||
access_log:
|
||||
- name: envoy.access_loggers.file
|
||||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
|
||||
path: "/var/log/access_otel.log"
|
||||
format: |
|
||||
[%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%"
|
||||
route_config:
|
||||
name: otel_route
|
||||
virtual_hosts:
|
||||
- name: otel_backend
|
||||
domains: ["*"]
|
||||
routes:
|
||||
- match:
|
||||
prefix: "/v1/traces"
|
||||
route:
|
||||
cluster: otel_collector_http_proxy
|
||||
timeout: 5s
|
||||
retry_policy:
|
||||
retry_on: "5xx,connect-failure,refused-stream,reset"
|
||||
num_retries: 3
|
||||
per_try_timeout: 2s
|
||||
host_selection_retry_max_attempts: 5
|
||||
retriable_status_codes: [500, 502, 503, 504]
|
||||
http_filters:
|
||||
- name: envoy.filters.http.router
|
||||
typed_config:
|
||||
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
|
||||
|
||||
- name: egress_traffic_llm
|
||||
address:
|
||||
socket_address:
|
||||
|
|
@ -1033,4 +1075,34 @@ static_resources:
|
|||
socket_address:
|
||||
address: host.docker.internal
|
||||
port_value: 4318
|
||||
# Cluster for OTEL HTTP proxy with retry/circuit breaking
|
||||
- name: otel_collector_http_proxy
|
||||
connect_timeout: 2s
|
||||
type: STRICT_DNS
|
||||
dns_lookup_family: V4_ONLY
|
||||
lb_policy: ROUND_ROBIN
|
||||
load_assignment:
|
||||
cluster_name: otel_collector_http_proxy
|
||||
endpoints:
|
||||
- lb_endpoints:
|
||||
- endpoint:
|
||||
address:
|
||||
socket_address:
|
||||
address: host.docker.internal
|
||||
port_value: 4318
|
||||
# Circuit breaker configuration to prevent overwhelming OTEL collector
|
||||
circuit_breakers:
|
||||
thresholds:
|
||||
- priority: DEFAULT
|
||||
max_connections: 100
|
||||
max_pending_requests: 100
|
||||
max_requests: 100
|
||||
max_retries: 3
|
||||
# Health checking and outlier detection
|
||||
outlier_detection:
|
||||
consecutive_5xx: 5
|
||||
interval: 10s
|
||||
base_ejection_time: 30s
|
||||
max_ejection_percent: 50
|
||||
enforcing_consecutive_5xx: 100
|
||||
{% endif %}
|
||||
|
|
|
|||
|
|
@ -89,9 +89,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
|||
let model_aliases = Arc::new(arch_config.model_aliases.clone());
|
||||
|
||||
// Initialize trace collector and start background flusher
|
||||
let trace_collector = Arc::new(TraceCollector::from_env());
|
||||
// Tracing is enabled if the tracing config is present in arch_config.yaml
|
||||
// Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED
|
||||
let tracing_enabled = if arch_config.tracing.is_some() {
|
||||
info!("Tracing configuration found in arch_config.yaml");
|
||||
Some(true)
|
||||
} else {
|
||||
info!("No tracing configuration in arch_config.yaml, will check OTEL_TRACING_ENABLED env var");
|
||||
None
|
||||
};
|
||||
let trace_collector = Arc::new(TraceCollector::new(tracing_enabled));
|
||||
let _flusher_handle = trace_collector.clone().start_background_flusher();
|
||||
info!("Trace collector initialized and background flusher started");
|
||||
|
||||
|
||||
loop {
|
||||
let (stream, _) = listener.accept().await?;
|
||||
|
|
|
|||
|
|
@ -33,3 +33,4 @@ trace-collection = ["tokio", "reqwest", "tracing"]
|
|||
[dev-dependencies]
|
||||
pretty_assertions = "1.4.1"
|
||||
serde_json = "1.0.64"
|
||||
tokio = { version = "1.44", features = ["sync", "time", "macros", "rt"] }
|
||||
|
|
|
|||
|
|
@ -25,6 +25,12 @@ pub fn parse_traceparent(traceparent: &str) -> (String, String) {
|
|||
///
|
||||
/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)")
|
||||
/// maintaining its own span queue. Flushes all services together periodically.
|
||||
///
|
||||
/// Tracing can be enabled/disabled in two ways:
|
||||
/// 1. Via arch_config.yaml: presence of `tracing` configuration section
|
||||
/// 2. Via environment variable: `OTEL_TRACING_ENABLED=true/false`
|
||||
///
|
||||
/// When disabled, span recording and flushing are no-ops.
|
||||
pub struct TraceCollector {
|
||||
/// Spans grouped by service name
|
||||
/// Key: service name (e.g., "archgw(routing)", "archgw(llm)")
|
||||
|
|
@ -32,48 +38,53 @@ pub struct TraceCollector {
|
|||
spans_by_service: Arc<Mutex<HashMap<String, VecDeque<Span>>>>,
|
||||
flush_interval: Duration,
|
||||
otel_url: String,
|
||||
/// Whether tracing is enabled
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
impl TraceCollector {
|
||||
/// Create a new trace collector
|
||||
///
|
||||
/// # Arguments
|
||||
/// * `flush_interval` - How often to flush buffered spans
|
||||
/// * `otel_url` - OTEL collector endpoint URL
|
||||
pub fn new(
|
||||
flush_interval: Duration,
|
||||
otel_url: String,
|
||||
) -> Self {
|
||||
Self {
|
||||
spans_by_service: Arc::new(Mutex::new(HashMap::new())),
|
||||
flush_interval,
|
||||
otel_url,
|
||||
}
|
||||
}
|
||||
|
||||
/// Create with defaults from environment or sensible defaults
|
||||
pub fn from_env() -> Self {
|
||||
let batch_size = std::env::var("TRACE_BATCH_SIZE")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(100);
|
||||
|
||||
/// * `enabled` - Whether tracing is enabled
|
||||
/// - `Some(true)` - Force enable tracing
|
||||
/// - `Some(false)` - Force disable tracing
|
||||
/// - `None` - Check `OTEL_TRACING_ENABLED` env var (defaults to true if not set)
|
||||
///
|
||||
/// Other parameters are read from environment variables:
|
||||
/// - `TRACE_FLUSH_INTERVAL_SECS` - Flush interval in seconds (default: 1)
|
||||
/// - `OTEL_COLLECTOR_URL` - OTEL collector endpoint (default: http://localhost:9903/v1/traces)
|
||||
pub fn new(enabled: Option<bool>) -> Self {
|
||||
let flush_interval_secs = std::env::var("TRACE_FLUSH_INTERVAL_SECS")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(1);
|
||||
|
||||
let otel_url = std::env::var("OTEL_COLLECTOR_URL")
|
||||
.unwrap_or_else(|_| "http://host.docker.internal:4318/v1/traces".to_string());
|
||||
.unwrap_or_else(|_| "http://localhost:9903/v1/traces".to_string());
|
||||
|
||||
// Determine if tracing is enabled:
|
||||
// 1. Use explicit parameter if provided
|
||||
// 2. Otherwise check OTEL_TRACING_ENABLED env var
|
||||
// 3. Default to true if neither is set
|
||||
let enabled = enabled.unwrap_or_else(|| {
|
||||
std::env::var("OTEL_TRACING_ENABLED")
|
||||
.ok()
|
||||
.and_then(|s| s.parse().ok())
|
||||
.unwrap_or(true)
|
||||
});
|
||||
|
||||
debug!(
|
||||
"TraceCollector initialized: batch_size={}, flush_interval={}s, url={}",
|
||||
batch_size, flush_interval_secs, otel_url
|
||||
"TraceCollector initialized: flush_interval={}s, url={}, enabled={}",
|
||||
flush_interval_secs, otel_url, enabled
|
||||
);
|
||||
|
||||
Self::new(
|
||||
Duration::from_secs(flush_interval_secs),
|
||||
Self {
|
||||
spans_by_service: Arc::new(Mutex::new(HashMap::new())),
|
||||
flush_interval: Duration::from_secs(flush_interval_secs),
|
||||
otel_url,
|
||||
)
|
||||
enabled,
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a span for a specific service
|
||||
|
|
@ -82,6 +93,11 @@ impl TraceCollector {
|
|||
/// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)")
|
||||
/// * `span` - The span to record
|
||||
pub fn record_span(&self, service_name: impl Into<String>, span: Span) {
|
||||
// Skip recording if tracing is disabled
|
||||
if !self.enabled {
|
||||
return;
|
||||
}
|
||||
|
||||
let service_name = service_name.into();
|
||||
|
||||
// Use try_lock to avoid blocking in async contexts
|
||||
|
|
@ -103,6 +119,11 @@ impl TraceCollector {
|
|||
/// Flush all buffered spans to the OTEL collector
|
||||
/// Builds ResourceSpans for each service with spans
|
||||
pub async fn flush(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
|
||||
// Skip flushing if tracing is disabled
|
||||
if !self.enabled {
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let mut spans_by_service = self.spans_by_service.lock().await;
|
||||
|
||||
if spans_by_service.is_empty() {
|
||||
|
|
@ -225,10 +246,7 @@ mod tests {
|
|||
|
||||
#[tokio::test]
|
||||
async fn test_collector_basic() {
|
||||
let collector = TraceCollector::new(
|
||||
Duration::from_secs(60),
|
||||
"http://test:4318/v1/traces".to_string(),
|
||||
);
|
||||
let collector = TraceCollector::new(Some(true));
|
||||
|
||||
let span = SpanBuilder::new("test_operation")
|
||||
.with_trace_id("abc123")
|
||||
|
|
@ -242,10 +260,7 @@ mod tests {
|
|||
#[tokio::test]
|
||||
async fn test_collector_auto_flush() {
|
||||
// Since batch-triggered flush behavior was removed, record two spans and verify both are buffered
|
||||
let collector = Arc::new(TraceCollector::new(
|
||||
Duration::from_secs(60),
|
||||
"http://test:4318/v1/traces".to_string(),
|
||||
));
|
||||
let collector = Arc::new(TraceCollector::new(Some(true)));
|
||||
|
||||
let span1 = SpanBuilder::new("test1").build();
|
||||
let span2 = SpanBuilder::new("test2").build();
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue