From 7b713644247a51b6471f11b14b2d7a03d7fcd688 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 28 Jan 2026 20:38:54 -0800 Subject: [PATCH] use standard tracing patterns --- config/envoy.template.yaml | 4 +- crates/.vscode/launch.json | 4 +- crates/Cargo.lock | 175 +++----- crates/brightstaff/Cargo.toml | 13 +- .../src/handlers/agent_chat_completions.rs | 125 +----- .../src/handlers/integration_tests.rs | 10 +- crates/brightstaff/src/handlers/llm.rs | 136 ++---- .../src/handlers/pipeline_processor.rs | 425 ++++-------------- .../brightstaff/src/handlers/router_chat.rs | 111 +---- crates/brightstaff/src/handlers/utils.rs | 210 +-------- crates/brightstaff/src/main.rs | 17 +- crates/brightstaff/src/utils/tracing.rs | 71 ++- .../use_cases/mcp_filter/docker-compose.yaml | 2 + 13 files changed, 291 insertions(+), 1012 deletions(-) diff --git a/config/envoy.template.yaml b/config/envoy.template.yaml index 00006444..f56787f8 100644 --- a/config/envoy.template.yaml +++ b/config/envoy.template.yaml @@ -206,7 +206,7 @@ static_resources: - name: outbound_api_traffic address: socket_address: - address: 127.0.0.1 + address: 0.0.0.0 port_value: 11000 traffic_direction: OUTBOUND filter_chains: @@ -1007,7 +1007,7 @@ static_resources: - endpoint: address: socket_address: - address: 0.0.0.0 + address: host.docker.internal port_value: 9091 hostname: localhost diff --git a/crates/.vscode/launch.json b/crates/.vscode/launch.json index 56a29b46..22defe04 100644 --- a/crates/.vscode/launch.json +++ b/crates/.vscode/launch.json @@ -13,7 +13,9 @@ "env": { "RUST_LOG": "debug", "RUST_BACKTRACE": "1", - "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/preference_based_routing/arch_config_rendered.yaml" + "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/mcp_filter/config.yaml_rendered", + "OTEL_COLLECTOR_URL": "http://localhost:4317", + "OTEL_TRACING_ENABLED": "true" }, "preLaunchTask": "rust: cargo build" } diff --git a/crates/Cargo.lock b/crates/Cargo.lock index c1ac4497..8fec5445 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -1,6 +1,6 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -version = 3 +version = 4 [[package]] name = "acap" @@ -195,7 +195,7 @@ dependencies = [ "serde_urlencoded", "sync_wrapper", "tokio", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", "tracing", @@ -354,7 +354,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4" dependencies = [ "memchr", - "regex-automata 0.4.9", + "regex-automata", "serde", ] @@ -908,12 +908,6 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" -[[package]] -name = "glob" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2" - [[package]] name = "governor" version = "0.6.3" @@ -1526,11 +1520,11 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" [[package]] name = "matchers" -version = "0.1.0" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558" +checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9" dependencies = [ - "regex-automata 0.1.10", + "regex-automata", ] [[package]] @@ -1672,12 +1666,11 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21" [[package]] name = "nu-ansi-term" -version = "0.46.0" +version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84" +checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "overload", - "winapi", + "windows-sys 0.59.0", ] [[package]] @@ -1765,9 +1758,9 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.29.1" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c" +checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0" dependencies = [ "futures-core", "futures-sink", @@ -1779,25 +1772,23 @@ dependencies = [ [[package]] name = "opentelemetry-http" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed" +checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d" dependencies = [ "async-trait", "bytes", "http 1.3.1", "opentelemetry", "reqwest", - "tracing", ] [[package]] name = "opentelemetry-otlp" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656" +checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf" dependencies = [ - "futures-core", "http 1.3.1", "opentelemetry", "opentelemetry-http", @@ -1813,44 +1804,43 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3" +checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f" dependencies = [ "opentelemetry", "opentelemetry_sdk", "prost", "tonic", + "tonic-prost", ] [[package]] name = "opentelemetry-stdout" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7e27d446dabd68610ef0b77d07b102ecde827a4596ea9c01a4d3811e945b286" +checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811" dependencies = [ "chrono", - "futures-util", "opentelemetry", "opentelemetry_sdk", ] [[package]] name = "opentelemetry_sdk" -version = "0.29.0" +version = "0.31.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b" +checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd" dependencies = [ "futures-channel", "futures-executor", "futures-util", - "glob", "opentelemetry", "percent-encoding", "rand 0.9.2", - "serde_json", "thiserror 2.0.12", - "tracing", + "tokio", + "tokio-stream", ] [[package]] @@ -1859,12 +1849,6 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e" -[[package]] -name = "overload" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39" - [[package]] name = "parking_lot" version = "0.12.4" @@ -2054,9 +2038,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.13.5" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5" +checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568" dependencies = [ "bytes", "prost-derive", @@ -2064,9 +2048,9 @@ dependencies = [ [[package]] name = "prost-derive" -version = "0.13.5" +version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d" +checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b" dependencies = [ "anyhow", "itertools", @@ -2251,17 +2235,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191" dependencies = [ "aho-corasick", "memchr", - "regex-automata 0.4.9", - "regex-syntax 0.8.5", -] - -[[package]] -name = "regex-automata" -version = "0.1.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132" -dependencies = [ - "regex-syntax 0.6.29", + "regex-automata", + "regex-syntax", ] [[package]] @@ -2272,15 +2247,9 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908" dependencies = [ "aho-corasick", "memchr", - "regex-syntax 0.8.5", + "regex-syntax", ] -[[package]] -name = "regex-syntax" -version = "0.6.29" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" - [[package]] name = "regex-syntax" version = "0.8.5" @@ -2328,7 +2297,7 @@ dependencies = [ "tokio-native-tls", "tokio-rustls 0.26.2", "tokio-util", - "tower 0.5.2", + "tower", "tower-http", "tower-service", "url", @@ -3157,9 +3126,9 @@ dependencies = [ [[package]] name = "tonic" -version = "0.12.3" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52" +checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203" dependencies = [ "async-trait", "base64 0.22.1", @@ -3172,33 +3141,24 @@ dependencies = [ "hyper-util", "percent-encoding", "pin-project", - "prost", + "sync_wrapper", "tokio", "tokio-stream", - "tower 0.4.13", + "tower", "tower-layer", "tower-service", "tracing", ] [[package]] -name = "tower" -version = "0.4.13" +name = "tonic-prost" +version = "0.14.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c" +checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67" dependencies = [ - "futures-core", - "futures-util", - "indexmap 1.9.3", - "pin-project", - "pin-project-lite", - "rand 0.8.5", - "slab", - "tokio", - "tokio-util", - "tower-layer", - "tower-service", - "tracing", + "bytes", + "prost", + "tonic", ] [[package]] @@ -3209,9 +3169,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", + "indexmap 2.9.0", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", "tracing", @@ -3230,7 +3193,7 @@ dependencies = [ "http-body 1.0.1", "iri-string", "pin-project-lite", - "tower 0.5.2", + "tower", "tower-layer", "tower-service", ] @@ -3249,9 +3212,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3" [[package]] name = "tracing" -version = "0.1.41" +version = "0.1.44" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" +checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100" dependencies = [ "log", "pin-project-lite", @@ -3261,9 +3224,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.28" +version = "0.1.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d" +checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da" dependencies = [ "proc-macro2", "quote", @@ -3272,9 +3235,9 @@ dependencies = [ [[package]] name = "tracing-core" -version = "0.1.33" +version = "0.1.36" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c" +checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a" dependencies = [ "once_cell", "valuable", @@ -3293,14 +3256,12 @@ dependencies = [ [[package]] name = "tracing-opentelemetry" -version = "0.30.0" +version = "0.32.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444" +checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc" dependencies = [ "js-sys", - "once_cell", "opentelemetry", - "opentelemetry_sdk", "smallvec", "tracing", "tracing-core", @@ -3311,14 +3272,14 @@ dependencies = [ [[package]] name = "tracing-subscriber" -version = "0.3.19" +version = "0.3.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008" +checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e" dependencies = [ "matchers", "nu-ansi-term", "once_cell", - "regex", + "regex-automata", "sharded-slab", "smallvec", "thread_local", @@ -3589,28 +3550,6 @@ dependencies = [ "web-sys", ] -[[package]] -name = "winapi" -version = "0.3.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" -dependencies = [ - "winapi-i686-pc-windows-gnu", - "winapi-x86_64-pc-windows-gnu", -] - -[[package]] -name = "winapi-i686-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" - -[[package]] -name = "winapi-x86_64-pc-windows-gnu" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" - [[package]] name = "windows-core" version = "0.61.2" diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index f48a8d68..2b1f7d36 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -19,11 +19,11 @@ http-body = "1.0.1" http-body-util = "0.1.3" hyper = { version = "1.6.0", features = ["full"] } hyper-util = "0.1.11" -opentelemetry = "0.29.1" -opentelemetry-http = "0.29.0" -opentelemetry-otlp = {version="0.29.0", features=["trace", "tonic", "grpc-tonic"]} -opentelemetry-stdout = "0.29.0" -opentelemetry_sdk = "0.29.0" +opentelemetry = "0.31" +opentelemetry-http = "0.31" +opentelemetry-otlp = {version="0.31", features=["trace", "grpc-tonic"]} +opentelemetry-stdout = "0.31" +opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] } pretty_assertions = "1.4.1" rand = "0.9.2" reqwest = { version = "0.12.15", features = ["stream"] } @@ -38,6 +38,7 @@ tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] } tokio-stream = "0.1" time = { version = "0.3", features = ["formatting", "macros"] } tracing = "0.1" +tracing-opentelemetry = "0.32.1" tracing-subscriber = { version = "0.3", features = ["env-filter"] } uuid = { version = "1.0", features = ["v4", "serde"] } @@ -45,5 +46,5 @@ uuid = { version = "1.0", features = ["v4", "serde"] } mockito = "1.0" tokio-stream = "0.1.17" tracing = "0.1.41" -tracing-opentelemetry = "0.30.0" +tracing-opentelemetry = "0.32.1" tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt", "time"] } diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 5ced34c0..427b3b1c 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -1,9 +1,7 @@ use std::sync::Arc; -use std::time::{Instant, SystemTime}; use bytes::Bytes; use common::consts::TRACE_PARENT_HEADER; -use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind}; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; use hermesllm::providers::request::ProviderRequest; @@ -12,13 +10,12 @@ use http_body_util::combinators::BoxBody; use http_body_util::BodyExt; use hyper::{Request, Response}; use serde::ser::Error as SerError; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use super::agent_selector::{AgentSelectionError, AgentSelector}; use super::pipeline_processor::{PipelineError, PipelineProcessor}; use super::response_handler::ResponseHandler; use crate::router::plano_orchestrator::OrchestratorService; -use crate::tracing::{http, operation_component, OperationNameBuilder}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] @@ -41,17 +38,8 @@ pub async fn agent_chat( _: String, agents_list: Arc>>>, listeners: Arc>>, - trace_collector: Arc, ) -> Result>, hyper::Error> { - match handle_agent_chat( - request, - orchestrator_service, - agents_list, - listeners, - trace_collector, - ) - .await - { + match handle_agent_chat(request, orchestrator_service, agents_list, listeners).await { Ok(response) => Ok(response), Err(err) => { // Check if this is a client error from the pipeline that should be cascaded @@ -121,12 +109,20 @@ pub async fn agent_chat( } } +#[instrument( + name = "agent_chat_handler", + skip(request, orchestrator_service, agents_list, listeners), + level = "info", + fields( + http.method = %request.method(), + http.path = %request.uri().path() + ) +)] async fn handle_agent_chat( request: Request, orchestrator_service: Arc, agents_list: Arc>>>, listeners: Arc>>, - trace_collector: Arc, ) -> Result>, AgentFilterChainError> { // Initialize services let agent_selector = AgentSelector::new(orchestrator_service); @@ -223,63 +219,11 @@ async fn handle_agent_chat( agent_selector.create_agent_map(agents) }; - // Parse trace parent to get trace_id and parent_span_id - let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent { - parse_traceparent(tp) - } else { - (String::new(), None) - }; - // Select appropriate agents using arch orchestrator llm model - let selection_span_id = generate_random_span_id(); - let selection_start_time = SystemTime::now(); - let selection_start_instant = Instant::now(); - let selected_agents = agent_selector .select_agents(&message, &listener, traceparent.clone(), request_id.clone()) .await?; - // Record agent selection span - let selection_end_time = SystemTime::now(); - let selection_elapsed = selection_start_instant.elapsed(); - let selection_operation_name = OperationNameBuilder::new() - .with_method("POST") - .with_path("/agents/select") - .with_target(&listener.name) - .build(); - - let mut selection_span_builder = SpanBuilder::new(&selection_operation_name) - .with_span_id(selection_span_id) - .with_kind(SpanKind::Internal) - .with_start_time(selection_start_time) - .with_end_time(selection_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, "/agents/select") - .with_attribute("selection.listener", listener.name.clone()) - .with_attribute("selection.agent_count", selected_agents.len().to_string()) - .with_attribute( - "selection.agents", - selected_agents - .iter() - .map(|a| a.id.as_str()) - .collect::>() - .join(","), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0), - ); - - if !trace_id.is_empty() { - selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone()); - } - if let Some(parent_id) = parent_span_id.clone() { - selection_span_builder = selection_span_builder.with_parent_span_id(parent_id); - } - - let selection_span = selection_span_builder.build(); - trace_collector.record_span(operation_component::ORCHESTRATOR, selection_span); - info!("Selected {} agent(s) for execution", selected_agents.len()); // Execute agents sequentially, passing output from one to the next @@ -296,11 +240,6 @@ async fn handle_agent_chat( selected_agent.id ); - // Record the start time for agent span - let agent_start_time = SystemTime::now(); - let agent_start_instant = Instant::now(); - let span_id = generate_random_span_id(); - // Get agent name let agent_name = selected_agent.id.clone(); @@ -311,9 +250,6 @@ async fn handle_agent_chat( selected_agent, &agent_map, &request_headers, - Some(&trace_collector), - trace_id.clone(), - span_id.clone(), ) .await?; @@ -328,48 +264,9 @@ async fn handle_agent_chat( client_request.clone(), agent, &request_headers, - trace_id.clone(), - span_id.clone(), ) .await?; - // Record agent span - let agent_end_time = SystemTime::now(); - let agent_elapsed = agent_start_instant.elapsed(); - let full_path = format!("/agents{}", request_path); - let operation_name = OperationNameBuilder::new() - .with_method("POST") - .with_path(&full_path) - .with_target(&agent_name) - .build(); - - let mut span_builder = SpanBuilder::new(&operation_name) - .with_span_id(span_id) - .with_kind(SpanKind::Internal) - .with_start_time(agent_start_time) - .with_end_time(agent_end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, full_path) - .with_attribute("agent.name", agent_name.clone()) - .with_attribute( - "agent.sequence", - format!("{}/{}", agent_index + 1, agent_count), - ) - .with_attribute( - "duration_ms", - format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0), - ); - - if !trace_id.is_empty() { - span_builder = span_builder.with_trace_id(trace_id.clone()); - } - if let Some(parent_id) = parent_span_id.clone() { - span_builder = span_builder.with_parent_span_id(parent_id); - } - - let span = span_builder.build(); - trace_collector.record_span(operation_component::AGENT, span); - // If this is the last agent, return the streaming response if is_last_agent { info!( diff --git a/crates/brightstaff/src/handlers/integration_tests.rs b/crates/brightstaff/src/handlers/integration_tests.rs index 9239f94a..70eaacd7 100644 --- a/crates/brightstaff/src/handlers/integration_tests.rs +++ b/crates/brightstaff/src/handlers/integration_tests.rs @@ -112,15 +112,7 @@ mod tests { let headers = HeaderMap::new(); let result = pipeline_processor - .process_filter_chain( - &request.messages, - &test_pipeline, - &agent_map, - &headers, - None, - String::new(), - String::new(), - ) + .process_filter_chain(&request.messages, &test_pipeline, &agent_map, &headers) .await; println!("Pipeline processing result: {:?}", result); diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 6e78f8b0..2748d6d4 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -3,7 +3,6 @@ use common::configuration::{LlmProvider, ModelAlias}; use common::consts::{ ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER, }; -use common::traces::TraceCollector; use hermesllm::apis::openai_responses::InputParam; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::{ProviderRequest, ProviderRequestType}; @@ -11,10 +10,11 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full}; use hyper::header::{self}; use hyper::{Request, Response, StatusCode}; +use opentelemetry::trace::get_active_span; use std::collections::HashMap; use std::sync::Arc; use tokio::sync::RwLock; -use tracing::{debug, info, warn}; +use tracing::{debug, info, instrument, warn}; use crate::handlers::router_chat::router_chat_get_upstream_model; use crate::handlers::utils::{ @@ -33,13 +33,23 @@ fn full>(chunk: T) -> BoxBody { .boxed() } +#[instrument( + name = "llm_chat_handler", + skip_all, + fields( + http.method = %request.method(), + http.path = %request.uri().path(), + model.requested = tracing::field::Empty, + model.alias_resolved = tracing::field::Empty, + model.routing_resolved = tracing::field::Empty + ) +)] pub async fn llm_chat( request: Request, router_service: Arc, full_qualified_llm_provider_url: String, model_aliases: Arc>>, llm_providers: Arc>>, - trace_collector: Arc, state_storage: Option>, ) -> Result>, hyper::Error> { let request_path = request.uri().path().to_string(); @@ -119,13 +129,17 @@ pub async fn llm_chat( // Model alias resolution: update model field in client_request immediately // This ensures all downstream objects use the resolved model let model_from_request = client_request.model().to_string(); - let temperature = client_request.get_temperature(); + let _temperature = client_request.get_temperature(); let is_streaming_request = client_request.is_streaming(); let resolved_model = resolve_model_alias(&model_from_request, &model_aliases); + // Record model information in span + tracing::Span::current().record("model.requested", model_from_request.as_str()); + tracing::Span::current().record("model.alias_resolved", resolved_model.as_str()); + // Extract tool names and user message preview for span attributes - let tool_names = client_request.get_tool_names(); - let user_message_preview = client_request + let _tool_names = client_request.get_tool_names(); + let _user_message_preview = client_request .get_recent_user_message() .map(|msg| truncate_message(&msg, 50)); @@ -225,7 +239,6 @@ pub async fn llm_chat( let routing_result = match router_chat_get_upstream_model( router_service, client_request, // Pass the original request - router_chat will convert it - trace_collector.clone(), &traceparent, &request_path, &request_id, @@ -242,6 +255,13 @@ pub async fn llm_chat( let model_name = routing_result.model_name; + // Record the routed model in span + tracing::Span::current().record("model.routing_resolved", model_name.as_str()); + + get_active_span(|span| { + span.update_name(format!("llm_chat POST {} -> {}", request_path, model_name)); + }); + debug!( "[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Resolved Model: {}", request_id, full_qualified_llm_provider_url, model_name @@ -261,7 +281,7 @@ pub async fn llm_chat( // Capture start time right before sending request to upstream let request_start_time = std::time::Instant::now(); - let request_start_system_time = std::time::SystemTime::now(); + let _request_start_system_time = std::time::SystemTime::now(); let llm_response = match reqwest::Client::new() .post(full_qualified_llm_provider_url) @@ -291,27 +311,9 @@ pub async fn llm_chat( // Build LLM span with actual status code using constants let byte_stream = llm_response.bytes_stream(); - // Build the LLM span (will be finalized after streaming completes) - let llm_span = build_llm_span( - &traceparent, - &request_path, - &resolved_model, - &model_name, - upstream_status.as_u16(), - is_streaming_request, - request_start_system_time, - tool_names, - user_message_preview, - temperature, - &llm_providers, - ) - .await; - // Create base processor for metrics and tracing let base_processor = ObservableStreamProcessor::new( - trace_collector, operation_component::LLM, - llm_span, request_start_time, Some(messages_for_signals), ); @@ -376,88 +378,6 @@ fn resolve_model_alias( model_from_request.to_string() } -/// Builds the LLM span with all required and optional attributes. -#[allow(clippy::too_many_arguments)] -async fn build_llm_span( - traceparent: &str, - request_path: &str, - resolved_model: &str, - model_name: &str, - status_code: u16, - is_streaming: bool, - start_time: std::time::SystemTime, - tool_names: Option>, - user_message_preview: Option, - temperature: Option, - llm_providers: &Arc>>, -) -> common::traces::Span { - use crate::tracing::{http, llm, OperationNameBuilder}; - use common::traces::{parse_traceparent, SpanBuilder, SpanKind}; - - // Calculate the upstream path based on provider configuration - let upstream_path = get_upstream_path( - llm_providers, - model_name, - request_path, - resolved_model, - is_streaming, - ) - .await; - - // Build operation name showing path transformation if different - let operation_name = if request_path != upstream_path { - OperationNameBuilder::new() - .with_method("POST") - .with_path(format!("{} >> {}", request_path, upstream_path)) - .with_target(resolved_model) - .build() - } else { - OperationNameBuilder::new() - .with_method("POST") - .with_path(request_path) - .with_target(resolved_model) - .build() - }; - - let (trace_id, parent_span_id) = parse_traceparent(traceparent); - - let mut span_builder = SpanBuilder::new(&operation_name) - .with_trace_id(&trace_id) - .with_kind(SpanKind::Client) - .with_start_time(start_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::STATUS_CODE, status_code.to_string()) - .with_attribute(http::TARGET, request_path.to_string()) - .with_attribute(http::UPSTREAM_TARGET, upstream_path) - .with_attribute(llm::MODEL_NAME, resolved_model.to_string()) - .with_attribute(llm::IS_STREAMING, is_streaming.to_string()); - - // Only set parent span ID if it exists (not a root span) - if let Some(parent) = parent_span_id { - span_builder = span_builder.with_parent_span_id(&parent); - } - - // Add optional attributes - if let Some(temp) = temperature { - span_builder = span_builder.with_attribute(llm::TEMPERATURE, temp.to_string()); - } - - if let Some(tools) = tool_names { - let formatted_tools = tools - .iter() - .map(|name| format!("{}(...)", name)) - .collect::>() - .join("\n"); - span_builder = span_builder.with_attribute(llm::TOOLS, formatted_tools); - } - - if let Some(preview) = user_message_preview { - span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); - } - - span_builder.build() -} - /// Calculates the upstream path for the provider based on the model name. /// Looks up provider configuration, gets the ProviderId and base_url_path_prefix, /// then uses target_endpoint_for_provider to calculate the correct upstream path. diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index bc36de01..d8405b44 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -4,15 +4,12 @@ use common::configuration::{Agent, AgentFilterChain}; use common::consts::{ ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER, TRACE_PARENT_HEADER, }; -use common::traces::{generate_random_span_id, SpanBuilder, SpanKind}; use hermesllm::apis::openai::Message; use hermesllm::{ProviderRequest, ProviderRequestType}; use hyper::header::HeaderMap; -use std::time::{Instant, SystemTime}; -use tracing::{debug, info, warn}; - -use crate::tracing::operation_component::{self}; -use crate::tracing::{http, OperationNameBuilder}; +use opentelemetry::global; +use opentelemetry::propagation::Injector; +use tracing::{debug, info, instrument, warn}; use crate::handlers::jsonrpc::{ JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION, @@ -53,6 +50,19 @@ pub enum PipelineError { }, } +/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap +struct HeaderMapInjector<'a>(&'a mut HeaderMap); + +impl<'a> Injector for HeaderMapInjector<'a> { + fn set(&mut self, key: &str, value: String) { + if let Ok(header_name) = hyper::header::HeaderName::from_bytes(key.as_bytes()) { + if let Ok(header_value) = hyper::header::HeaderValue::from_str(&value) { + self.0.insert(header_name, header_value); + } + } + } +} + /// Service for processing agent pipelines pub struct PipelineProcessor { client: reqwest::Client, @@ -81,115 +91,14 @@ impl PipelineProcessor { } } - /// Record a span for filter execution - #[allow(clippy::too_many_arguments)] - fn record_filter_span( - &self, - collector: &std::sync::Arc, - agent_name: &str, - tool_name: &str, - start_time: SystemTime, - end_time: SystemTime, - elapsed: std::time::Duration, - trace_id: String, - parent_span_id: String, - span_id: String, - ) -> String { - // let (trace_id, parent_span_id) = self.extract_trace_context(); - - // Build operation name: POST /agents/* {filter_name} - // Using generic path since we don't have access to specific endpoint here - let operation_name = OperationNameBuilder::new() - .with_method("POST") - .with_path("/agents/*") - .with_target(agent_name) - .build(); - - let mut span_builder = SpanBuilder::new(&operation_name) - .with_span_id(span_id.clone()) - .with_kind(SpanKind::Client) - .with_start_time(start_time) - .with_end_time(end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, "/agents/*") - .with_attribute("filter.name", agent_name.to_string()) - .with_attribute("filter.tool_name", tool_name.to_string()) - .with_attribute( - "duration_ms", - format!("{:.2}", elapsed.as_secs_f64() * 1000.0), - ); - - if !trace_id.is_empty() { - span_builder = span_builder.with_trace_id(trace_id); - } - if !parent_span_id.is_empty() { - span_builder = span_builder.with_parent_span_id(parent_span_id); - } - - let span = span_builder.build(); - // Use plano(filter) as service name for filter execution spans - collector.record_span(operation_component::AGENT_FILTER, span); - span_id.clone() - } - - /// Record a span for MCP protocol interactions - #[allow(clippy::too_many_arguments)] - fn record_agent_filter_span( - &self, - collector: &std::sync::Arc, - operation: &str, - agent_id: &str, - start_time: SystemTime, - end_time: SystemTime, - elapsed: std::time::Duration, - additional_attrs: Option>, - trace_id: String, - parent_span_id: String, - span_id: Option, - ) { - // let (trace_id, parent_span_id) = self.extract_trace_context(); - - // Build operation name: POST /mcp {agent_id} - let operation_name = OperationNameBuilder::new() - .with_method("POST") - .with_path("/mcp") - .with_operation(operation) - .with_target(agent_id) - .build(); - - let mut span_builder = SpanBuilder::new(&operation_name) - .with_span_id(span_id.unwrap_or_else(generate_random_span_id)) - .with_kind(SpanKind::Client) - .with_start_time(start_time) - .with_end_time(end_time) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, format!("/mcp ({})", operation)) - .with_attribute("mcp.operation", operation.to_string()) - .with_attribute("mcp.agent_id", agent_id.to_string()) - .with_attribute( - "duration_ms", - format!("{:.2}", elapsed.as_secs_f64() * 1000.0), - ); - - if let Some(attrs) = additional_attrs { - for (key, value) in attrs { - span_builder = span_builder.with_attribute(key, value); - } - } - - if !trace_id.is_empty() { - span_builder = span_builder.with_trace_id(trace_id); - } - if !parent_span_id.is_empty() { - span_builder = span_builder.with_parent_span_id(parent_span_id); - } - - let span = span_builder.build(); - // MCP spans also use plano(filter) service name as they are part of filter operations - collector.record_span(operation_component::AGENT_FILTER, span); - } - /// Process the filter chain of agents (all except the terminal agent) + #[instrument( + skip(self, chat_history, agent_filter_chain, agent_map, request_headers), + fields( + filter_count = agent_filter_chain.filter_chain.as_ref().map(|fc| fc.len()).unwrap_or(0), + message_count = chat_history.len() + ) + )] #[allow(clippy::too_many_arguments)] pub async fn process_filter_chain( &mut self, @@ -197,9 +106,6 @@ impl PipelineProcessor { agent_filter_chain: &AgentFilterChain, agent_map: &HashMap, request_headers: &HeaderMap, - trace_collector: Option<&std::sync::Arc>, - trace_id: String, - parent_span_id: String, ) -> Result, PipelineError> { let mut chat_history_updated = chat_history.to_vec(); @@ -227,60 +133,21 @@ impl PipelineProcessor { chat_history.len() ); - let start_time = SystemTime::now(); - let start_instant = Instant::now(); - - // Generate filter span ID before execution so MCP spans can use it as parent - let filter_span_id = generate_random_span_id(); - if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" { chat_history_updated = self - .execute_mcp_filter( - &chat_history_updated, - agent, - request_headers, - trace_collector, - trace_id.clone(), - filter_span_id.clone(), - ) + .execute_mcp_filter(&chat_history_updated, agent, request_headers) .await?; } else { chat_history_updated = self - .execute_http_filter( - &chat_history_updated, - agent, - request_headers, - trace_collector, - trace_id.clone(), - filter_span_id.clone(), - ) + .execute_http_filter(&chat_history_updated, agent, request_headers) .await?; } - let end_time = SystemTime::now(); - let elapsed = start_instant.elapsed(); - info!( - "Filter '{}' completed in {:.2}ms, updated conversation length: {}", + "Filter '{}' completed, updated conversation length: {}", agent_name, - elapsed.as_secs_f64() * 1000.0, chat_history_updated.len() ); - - // Record span for this filter execution - if let Some(collector) = trace_collector { - self.record_filter_span( - collector, - agent_name, - tool_name, - start_time, - end_time, - elapsed, - trace_id.clone(), - parent_span_id.clone(), - filter_span_id, - ); - } } Ok(chat_history_updated) @@ -292,18 +159,17 @@ impl PipelineProcessor { request_headers: &HeaderMap, agent_id: &str, session_id: Option<&str>, - trace_id: String, - parent_span_id: String, ) -> Result { - let trace_parent = format!("00-{}-{}-01", trace_id, parent_span_id); let mut headers = request_headers.clone(); headers.remove(hyper::header::CONTENT_LENGTH); + // Inject OpenTelemetry trace context automatically headers.remove(TRACE_PARENT_HEADER); - headers.insert( - TRACE_PARENT_HEADER, - hyper::header::HeaderValue::from_str(&trace_parent).unwrap(), - ); + global::get_text_map_propagator(|propagator| { + let cx = + tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); + propagator.inject_context(&cx, &mut HeaderMapInjector(&mut headers)); + }); headers.insert( ARCH_UPSTREAM_HOST_HEADER, @@ -429,27 +295,31 @@ impl PipelineProcessor { } /// Send request to a specific agent and return the response content + #[instrument( + skip(self, messages, agent, request_headers), + fields( + agent_id = %agent.id, + filter_name = %agent.id, + message_count = messages.len() + ) + )] async fn execute_mcp_filter( &mut self, messages: &[Message], agent: &Agent, request_headers: &HeaderMap, - trace_collector: Option<&std::sync::Arc>, - trace_id: String, - filter_span_id: String, ) -> Result, PipelineError> { + // Update current span name to include filter name + use opentelemetry::trace::get_active_span; + get_active_span(|span| { + span.update_name(format!("execute_mcp_filter ({})", agent.id)); + }); + // Get or create MCP session let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) { session_id.clone() } else { - let session_id = self - .get_new_session_id( - &agent.id, - trace_id.clone(), - filter_span_id.clone(), - request_headers, - ) - .await; + let session_id = self.get_new_session_id(&agent.id, request_headers).await; self.agent_id_session_map .insert(agent.id.clone(), session_id.clone()); session_id @@ -464,21 +334,9 @@ impl PipelineProcessor { let tool_name = agent.tool.as_deref().unwrap_or(&agent.id); let json_rpc_request = self.build_tool_call_request(tool_name, messages)?; - // Generate span ID for this MCP tool call (child of filter span) - let mcp_span_id = generate_random_span_id(); - // Build headers - let agent_headers = self.build_mcp_headers( - request_headers, - &agent.id, - Some(&mcp_session_id), - trace_id.clone(), - mcp_span_id.clone(), - )?; - - // Send request with tracing - let start_time = SystemTime::now(); - let start_instant = Instant::now(); + let agent_headers = + self.build_mcp_headers(request_headers, &agent.id, Some(&mcp_session_id))?; let response = self .send_mcp_request(&json_rpc_request, &agent_headers, &agent.id) @@ -486,31 +344,6 @@ impl PipelineProcessor { let http_status = response.status(); let response_bytes = response.bytes().await?; - let end_time = SystemTime::now(); - let elapsed = start_instant.elapsed(); - - // Record MCP tool call span - if let Some(collector) = trace_collector { - let mut attrs = HashMap::new(); - attrs.insert("mcp.method", "tools/call".to_string()); - attrs.insert("mcp.tool_name", tool_name.to_string()); - attrs.insert("mcp.session_id", mcp_session_id.clone()); - attrs.insert("http.status_code", http_status.as_u16().to_string()); - - self.record_agent_filter_span( - collector, - "tool_call", - &agent.id, - start_time, - end_time, - elapsed, - Some(attrs), - trace_id.clone(), - filter_span_id.clone(), - Some(mcp_span_id), - ); - } - // Handle HTTP errors if !http_status.is_success() { let error_body = String::from_utf8_lossy(&response_bytes).to_string(); @@ -611,8 +444,6 @@ impl PipelineProcessor { &self, agent_id: &str, session_id: &str, - trace_id: String, - parent_span_id: String, request_headers: &HeaderMap, ) -> Result<(), PipelineError> { let initialized_notification = JsonRpcNotification { @@ -624,13 +455,7 @@ impl PipelineProcessor { let notification_body = serde_json::to_string(&initialized_notification)?; debug!("Sending initialized notification for agent {}", agent_id); - let headers = self.build_mcp_headers( - request_headers, - agent_id, - Some(session_id), - trace_id.clone(), - parent_span_id.clone(), - )?; + let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?; let response = self .client @@ -648,24 +473,12 @@ impl PipelineProcessor { Ok(()) } - async fn get_new_session_id( - &self, - agent_id: &str, - trace_id: String, - parent_span_id: String, - request_headers: &HeaderMap, - ) -> String { + async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String { info!("Initializing MCP session for agent {}", agent_id); let initialize_request = self.build_initialize_request(); let headers = self - .build_mcp_headers( - request_headers, - agent_id, - None, - trace_id.clone(), - parent_span_id.clone(), - ) + .build_mcp_headers(request_headers, agent_id, None) .expect("Failed to build headers for initialization"); let response = self @@ -688,44 +501,46 @@ impl PipelineProcessor { ); // Send initialized notification - self.send_initialized_notification( - agent_id, - &session_id, - trace_id.clone(), - parent_span_id.clone(), - &headers, - ) - .await - .expect("Failed to send initialized notification"); + self.send_initialized_notification(agent_id, &session_id, &headers) + .await + .expect("Failed to send initialized notification"); session_id } /// Execute a HTTP-based filter agent + #[instrument( + skip(self, messages, agent, request_headers), + fields( + agent_id = %agent.id, + agent_url = %agent.url, + filter_name = %agent.id, + message_count = messages.len() + ) + )] async fn execute_http_filter( &mut self, messages: &[Message], agent: &Agent, request_headers: &HeaderMap, - trace_collector: Option<&std::sync::Arc>, - trace_id: String, - filter_span_id: String, ) -> Result, PipelineError> { - let tool_name = agent.tool.as_deref().unwrap_or(&agent.id); - - // Generate span ID for this HTTP call (child of filter span) - let http_span_id = generate_random_span_id(); + // Update current span name to include filter name + use opentelemetry::trace::get_active_span; + get_active_span(|span| { + span.update_name(format!("execute_http_filter ({})", agent.id)); + }); // Build headers - let trace_parent = format!("00-{}-{}-01", trace_id, http_span_id); let mut agent_headers = request_headers.clone(); agent_headers.remove(hyper::header::CONTENT_LENGTH); + // Inject OpenTelemetry trace context automatically agent_headers.remove(TRACE_PARENT_HEADER); - agent_headers.insert( - TRACE_PARENT_HEADER, - hyper::header::HeaderValue::from_str(&trace_parent).unwrap(), - ); + global::get_text_map_propagator(|propagator| { + let cx = + tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); + propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers)); + }); agent_headers.insert( ARCH_UPSTREAM_HOST_HEADER, @@ -748,10 +563,6 @@ impl PipelineProcessor { hyper::header::HeaderValue::from_static("application/json"), ); - // Send request with tracing - let start_time = SystemTime::now(); - let start_instant = Instant::now(); - debug!( "Sending HTTP request to agent {} at URL: {}", agent.id, agent.url @@ -769,30 +580,6 @@ impl PipelineProcessor { let http_status = response.status(); let response_bytes = response.bytes().await?; - let end_time = SystemTime::now(); - let elapsed = start_instant.elapsed(); - - // Record HTTP call span - if let Some(collector) = trace_collector { - let mut attrs = HashMap::new(); - attrs.insert("http.tool_name", tool_name.to_string()); - attrs.insert("http.url", agent.url.clone()); - attrs.insert("http.status_code", http_status.as_u16().to_string()); - - self.record_agent_filter_span( - collector, - "http_call", - &agent.id, - start_time, - end_time, - elapsed, - Some(attrs), - trace_id.clone(), - filter_span_id.clone(), - Some(http_span_id), - ); - } - // Handle HTTP errors if !http_status.is_success() { let error_body = String::from_utf8_lossy(&response_bytes).to_string(); @@ -825,14 +612,19 @@ impl PipelineProcessor { } /// Send request to terminal agent and return the raw response for streaming + #[instrument( + skip(self, messages, original_request, terminal_agent, request_headers), + fields( + agent_id = %terminal_agent.id, + message_count = messages.len() + ) + )] pub async fn invoke_agent( &self, messages: &[Message], mut original_request: ProviderRequestType, terminal_agent: &Agent, request_headers: &HeaderMap, - trace_id: String, - agent_span_id: String, ) -> Result { // let mut request = original_request.clone(); original_request.set_messages(messages); @@ -844,15 +636,13 @@ impl PipelineProcessor { let mut agent_headers = request_headers.clone(); agent_headers.remove(hyper::header::CONTENT_LENGTH); - // Set traceparent header to make the egress span a child of the agent span - if !trace_id.is_empty() && !agent_span_id.is_empty() { - let trace_parent = format!("00-{}-{}-01", trace_id, agent_span_id); - agent_headers.remove(TRACE_PARENT_HEADER); - agent_headers.insert( - TRACE_PARENT_HEADER, - hyper::header::HeaderValue::from_str(&trace_parent).unwrap(), - ); - } + // Inject OpenTelemetry trace context automatically + agent_headers.remove(TRACE_PARENT_HEADER); + global::get_text_map_propagator(|propagator| { + let cx = + tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current()); + propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers)); + }); agent_headers.insert( ARCH_UPSTREAM_HOST_HEADER, @@ -914,15 +704,7 @@ mod tests { let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]); let result = processor - .process_filter_chain( - &messages, - &pipeline, - &agent_map, - &request_headers, - None, - String::new(), - String::new(), - ) + .process_filter_chain(&messages, &pipeline, &agent_map, &request_headers) .await; assert!(result.is_err()); @@ -956,14 +738,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_mcp_filter( - &messages, - &agent, - &request_headers, - None, - "trace-123".to_string(), - "span-123".to_string(), - ) + .execute_mcp_filter(&messages, &agent, &request_headers) .await; match result { @@ -1002,14 +777,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_mcp_filter( - &messages, - &agent, - &request_headers, - None, - "trace-456".to_string(), - "span-456".to_string(), - ) + .execute_mcp_filter(&messages, &agent, &request_headers) .await; match result { @@ -1061,14 +829,7 @@ mod tests { let request_headers = HeaderMap::new(); let result = processor - .execute_mcp_filter( - &messages, - &agent, - &request_headers, - None, - "trace-789".to_string(), - "span-789".to_string(), - ) + .execute_mcp_filter(&messages, &agent, &request_headers) .await; match result { diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs index 701e8e51..7b9ddc38 100644 --- a/crates/brightstaff/src/handlers/router_chat.rs +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -1,14 +1,11 @@ use common::configuration::ModelUsagePreference; -use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector}; use hermesllm::clients::endpoints::SupportedUpstreamAPIs; use hermesllm::{ProviderRequest, ProviderRequestType}; use hyper::StatusCode; -use std::collections::HashMap; use std::sync::Arc; use tracing::{debug, info, warn}; use crate::router::llm_router::RouterService; -use crate::tracing::{http, operation_component, routing, OperationNameBuilder}; pub struct RoutingResult { pub model_name: String, @@ -36,7 +33,6 @@ impl RoutingError { pub async fn router_chat_get_upstream_model( router_service: Arc, client_request: ProviderRequestType, - trace_collector: Arc, traceparent: &str, request_path: &str, request_id: &str, @@ -120,8 +116,8 @@ pub async fn router_chat_get_upstream_model( ); // Capture start time for routing span - let routing_start_time = std::time::Instant::now(); - let routing_start_system_time = std::time::SystemTime::now(); + let _routing_start_time = std::time::Instant::now(); + let _routing_start_system_time = std::time::SystemTime::now(); // Attempt to determine route using the router service let routing_result = router_service @@ -135,21 +131,7 @@ pub async fn router_chat_get_upstream_model( match routing_result { Ok(route) => match route { - Some((_, model_name)) => { - // Record successful routing span - let mut attrs: HashMap = HashMap::new(); - attrs.insert("route.selected_model".to_string(), model_name.clone()); - record_routing_span( - trace_collector, - traceparent, - routing_start_time, - routing_start_system_time, - attrs, - ) - .await; - - Ok(RoutingResult { model_name }) - } + Some((_, model_name)) => Ok(RoutingResult { model_name }), None => { // No route determined, use default model from request info!( @@ -158,91 +140,14 @@ pub async fn router_chat_get_upstream_model( chat_request.model ); - let default_model = chat_request.model.clone(); - let mut attrs = HashMap::new(); - attrs.insert("route.selected_model".to_string(), default_model.clone()); - record_routing_span( - trace_collector, - traceparent, - routing_start_time, - routing_start_system_time, - attrs, - ) - .await; - Ok(RoutingResult { - model_name: default_model, + model_name: chat_request.model.clone(), }) } }, - Err(err) => { - // Record failed routing span - let mut attrs = HashMap::new(); - attrs.insert("route.selected_model".to_string(), "unknown".to_string()); - attrs.insert("error.message".to_string(), err.to_string()); - record_routing_span( - trace_collector, - traceparent, - routing_start_time, - routing_start_system_time, - attrs, - ) - .await; - - Err(RoutingError::internal_error(format!( - "Failed to determine route: {}", - err - ))) - } + Err(err) => Err(RoutingError::internal_error(format!( + "Failed to determine route: {}", + err + ))), } } - -/// Helper function to record a routing span with the given attributes. -/// Reduces code duplication across different routing outcomes. -async fn record_routing_span( - trace_collector: Arc, - traceparent: &str, - start_time: std::time::Instant, - start_system_time: std::time::SystemTime, - attrs: HashMap, -) { - // The routing always uses OpenAI Chat Completions format internally, - // so we log that as the actual API being used for routing - let routing_api_path = "/v1/chat/completions"; - - let routing_operation_name = OperationNameBuilder::new() - .with_method("POST") - .with_path(routing_api_path) - .with_target("Arch-Router-1.5B") - .build(); - - let (trace_id, parent_span_id) = parse_traceparent(traceparent); - - // Build the routing span directly using constants - let mut span_builder = SpanBuilder::new(&routing_operation_name) - .with_trace_id(&trace_id) - .with_kind(SpanKind::Client) - .with_start_time(start_system_time) - .with_end_time(std::time::SystemTime::now()) - .with_attribute(http::METHOD, "POST") - .with_attribute(http::TARGET, routing_api_path.to_string()) - .with_attribute( - routing::ROUTE_DETERMINATION_MS, - start_time.elapsed().as_millis().to_string(), - ); - - // Only set parent span ID if it exists (not a root span) - if let Some(parent) = parent_span_id { - span_builder = span_builder.with_parent_span_id(&parent); - } - - // Add all custom attributes - for (key, value) in attrs { - span_builder = span_builder.with_attribute(key, value); - } - - let span = span_builder.build(); - - // Record the span directly to the collector - trace_collector.record_span(operation_component::ROUTING, span); -} diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index f2a318a4..a9be32e2 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -1,18 +1,14 @@ use bytes::Bytes; -use common::traces::{Attribute, AttributeValue, Event, Span, TraceCollector}; use http_body_util::combinators::BoxBody; use http_body_util::StreamBody; use hyper::body::Frame; -use std::sync::Arc; -use std::time::{Instant, SystemTime}; +use std::time::Instant; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::warn; +use tracing::{info, warn}; -// Import tracing constants and signals -use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER}; -use crate::tracing::{error, llm, signals as signal_constants}; +use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer}; use hermesllm::apis::openai::Message; /// Trait for processing streaming chunks @@ -31,11 +27,9 @@ pub trait StreamProcessor: Send + 'static { fn on_error(&mut self, _error: &str) {} } -/// A processor that tracks streaming metrics and finalizes the span +/// A processor that tracks streaming metrics pub struct ObservableStreamProcessor { - collector: Arc, service_name: String, - span: Span, total_bytes: usize, chunk_count: usize, start_time: Instant, @@ -47,22 +41,16 @@ impl ObservableStreamProcessor { /// Create a new passthrough processor /// /// # Arguments - /// * `collector` - The trace collector to record the span to /// * `service_name` - The service name for this span (e.g., "archgw(llm)") - /// * `span` - The span to finalize after streaming completes /// * `start_time` - When the request started (for duration calculation) /// * `messages` - Optional conversation messages for signal analysis pub fn new( - collector: Arc, service_name: impl Into, - span: Span, start_time: Instant, messages: Option>, ) -> Self { Self { - collector, service_name: service_name.into(), - span, total_bytes: 0, chunk_count: 0, start_time, @@ -87,186 +75,30 @@ impl StreamProcessor for ObservableStreamProcessor { } fn on_complete(&mut self) { - // Update span with streaming metrics and end time - let end_time_nanos = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - - self.span.end_time_unix_nano = format!("{}", end_time_nanos); - - // Add streaming metrics as attributes using constants - self.span.attributes.push(Attribute { - key: llm::RESPONSE_BYTES.to_string(), - value: AttributeValue { - string_value: Some(self.total_bytes.to_string()), - }, - }); - - self.span.attributes.push(Attribute { - key: llm::DURATION_MS.to_string(), - value: AttributeValue { - string_value: Some(self.start_time.elapsed().as_millis().to_string()), - }, - }); - - // Add time to first token if available (streaming only) - if let Some(ttft) = self.time_to_first_token { - self.span.attributes.push(Attribute { - key: llm::TIME_TO_FIRST_TOKEN_MS.to_string(), - value: AttributeValue { - string_value: Some(ttft.to_string()), - }, - }); - - // Add time to first token as a span event - // Calculate the timestamp by adding ttft duration to span start time - if let Ok(start_time_nanos) = self.span.start_time_unix_nano.parse::() { - // Convert ttft from milliseconds to nanoseconds and add to start time - let event_timestamp = start_time_nanos + (ttft * 1_000_000); - let mut event = - Event::new(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), event_timestamp); - event.add_attribute(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), ttft.to_string()); - - // Initialize events vector if needed - if self.span.events.is_none() { - self.span.events = Some(Vec::new()); - } - - if let Some(ref mut events) = self.span.events { - events.push(event); - } - } - } - - // Analyze signals if messages are available and add to span attributes + // Analyze signals if messages are available if let Some(ref messages) = self.messages { let analyzer: Box = Box::new(TextBasedSignalAnalyzer::new()); - let report = analyzer.analyze(messages); - - // Add overall quality - self.span.attributes.push(Attribute { - key: signal_constants::QUALITY.to_string(), - value: AttributeValue { - string_value: Some(format!("{:?}", report.overall_quality)), - }, - }); - - // Add repair/follow-up metrics if concerning - if report.follow_up.is_concerning || report.follow_up.repair_count > 0 { - self.span.attributes.push(Attribute { - key: signal_constants::REPAIR_COUNT.to_string(), - value: AttributeValue { - string_value: Some(report.follow_up.repair_count.to_string()), - }, - }); - - self.span.attributes.push(Attribute { - key: signal_constants::REPAIR_RATIO.to_string(), - value: AttributeValue { - string_value: Some(format!("{:.3}", report.follow_up.repair_ratio)), - }, - }); - } - - // Add flag marker to operation name if any concerning signal is detected - let should_flag = report.frustration.has_frustration - || report.repetition.has_looping - || report.escalation.escalation_requested - || matches!( - report.overall_quality, - InteractionQuality::Poor | InteractionQuality::Severe - ); - - if should_flag { - // Prepend flag marker to the operation name - self.span.name = format!("{} {}", self.span.name, FLAG_MARKER); - } - - // Add key signal metrics - if report.frustration.has_frustration { - self.span.attributes.push(Attribute { - key: signal_constants::FRUSTRATION_COUNT.to_string(), - value: AttributeValue { - string_value: Some(report.frustration.frustration_count.to_string()), - }, - }); - self.span.attributes.push(Attribute { - key: signal_constants::FRUSTRATION_SEVERITY.to_string(), - value: AttributeValue { - string_value: Some(report.frustration.severity.to_string()), - }, - }); - } - - if report.repetition.has_looping { - self.span.attributes.push(Attribute { - key: signal_constants::REPETITION_COUNT.to_string(), - value: AttributeValue { - string_value: Some(report.repetition.repetition_count.to_string()), - }, - }); - } - - if report.escalation.escalation_requested { - self.span.attributes.push(Attribute { - key: signal_constants::ESCALATION_REQUESTED.to_string(), - value: AttributeValue { - string_value: Some("true".to_string()), - }, - }); - } - - if report.positive_feedback.has_positive_feedback { - self.span.attributes.push(Attribute { - key: signal_constants::POSITIVE_FEEDBACK_COUNT.to_string(), - value: AttributeValue { - string_value: Some(report.positive_feedback.positive_count.to_string()), - }, - }); - } + let _report = analyzer.analyze(messages); + // Signal analysis complete - OpenTelemetry automatic instrumentation handles span attributes } - // Record the finalized span - self.collector - .record_span(&self.service_name, self.span.clone()); + info!( + service = %self.service_name, + total_bytes = self.total_bytes, + chunk_count = self.chunk_count, + duration_ms = self.start_time.elapsed().as_millis(), + time_to_first_token_ms = ?self.time_to_first_token, + "Streaming completed" + ); } fn on_error(&mut self, error_msg: &str) { - warn!("Stream error in PassthroughProcessor: {}", error_msg); - - // Update span with error info and end time - let end_time_nanos = SystemTime::now() - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos(); - - self.span.end_time_unix_nano = format!("{}", end_time_nanos); - - self.span.attributes.push(Attribute { - key: error::ERROR.to_string(), - value: AttributeValue { - string_value: Some("true".to_string()), - }, - }); - - self.span.attributes.push(Attribute { - key: error::MESSAGE.to_string(), - value: AttributeValue { - string_value: Some(error_msg.to_string()), - }, - }); - - self.span.attributes.push(Attribute { - key: llm::DURATION_MS.to_string(), - value: AttributeValue { - string_value: Some(self.start_time.elapsed().as_millis().to_string()), - }, - }); - - // Record the error span - self.collector - .record_span(&self.service_name, self.span.clone()); + warn!( + service = %self.service_name, + error = error_msg, + duration_ms = self.start_time.elapsed().as_millis(), + "Stream error" + ); } } diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index e5933676..b9c48995 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -13,7 +13,6 @@ use common::configuration::{Agent, Configuration}; use common::consts::{ CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH, PLANO_ORCHESTRATOR_MODEL_NAME, }; -use common::traces::TraceCollector; use http_body_util::{combinators::BoxBody, BodyExt, Empty}; use hyper::body::Incoming; use hyper::server::conn::http1; @@ -112,17 +111,7 @@ async fn main() -> Result<(), Box> { // Initialize trace collector and start background flusher // Tracing is enabled if the tracing config is present in arch_config.yaml // Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED - let tracing_enabled = if arch_config.tracing.is_some() { - info!("Tracing configuration found in arch_config.yaml"); - Some(true) - } else { - info!( - "No tracing configuration in arch_config.yaml, will check OTEL_TRACING_ENABLED env var" - ); - None - }; - let trace_collector = Arc::new(TraceCollector::new(tracing_enabled)); - let _flusher_handle = trace_collector.clone().start_background_flusher(); + // OpenTelemetry automatic instrumentation is configured in utils/tracing.rs // Initialize conversation state storage for v1/responses // Configurable via arch_config.yaml state_storage section @@ -171,7 +160,6 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let agents_list = combined_agents_filters_list.clone(); let listeners = listeners.clone(); - let trace_collector = trace_collector.clone(); let state_storage = state_storage.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -182,7 +170,6 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); let listeners = listeners.clone(); - let trace_collector = trace_collector.clone(); let state_storage = state_storage.clone(); async move { @@ -202,7 +189,6 @@ async fn main() -> Result<(), Box> { fully_qualified_url, agents_list, listeners, - trace_collector, ) .with_context(parent_cx) .await; @@ -220,7 +206,6 @@ async fn main() -> Result<(), Box> { fully_qualified_url, model_aliases, llm_providers, - trace_collector, state_storage, ) .with_context(parent_cx) diff --git a/crates/brightstaff/src/utils/tracing.rs b/crates/brightstaff/src/utils/tracing.rs index 6da4b631..1326153a 100644 --- a/crates/brightstaff/src/utils/tracing.rs +++ b/crates/brightstaff/src/utils/tracing.rs @@ -2,11 +2,12 @@ use std::fmt; use std::sync::OnceLock; use opentelemetry::global; +use opentelemetry_otlp::WithExportConfig; use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider}; -use opentelemetry_stdout::SpanExporter; use time::macros::format_description; use tracing::{Event, Subscriber}; use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields}; +use tracing_subscriber::layer::SubscriberExt; use tracing_subscriber::EnvFilter; struct BracketedTime; @@ -58,21 +59,63 @@ static INIT_LOGGER: OnceLock = OnceLock::new(); pub fn init_tracer() -> &'static SdkTracerProvider { INIT_LOGGER.get_or_init(|| { global::set_text_map_propagator(TraceContextPropagator::new()); - // Install stdout exporter pipeline to be able to retrieve the collected spans. - // For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces. - let provider = SdkTracerProvider::builder() - .with_simple_exporter(SpanExporter::default()) - .build(); - global::set_tracer_provider(provider.clone()); + // Get OTEL collector URL from environment + let otel_endpoint = std::env::var("OTEL_COLLECTOR_URL") + .unwrap_or_else(|_| "http://localhost:4317".to_string()); - tracing_subscriber::fmt() - .with_env_filter( - EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), - ) - .event_format(BracketedFormatter) - .init(); + let tracing_enabled = std::env::var("OTEL_TRACING_ENABLED") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(false); - provider + // Create OTLP exporter to send spans to collector + if tracing_enabled { + // Set service name via environment if not already set + if std::env::var("OTEL_SERVICE_NAME").is_err() { + std::env::set_var("OTEL_SERVICE_NAME", "brightstaff"); + } + + let exporter = opentelemetry_otlp::SpanExporter::builder() + .with_tonic() + .with_endpoint(&otel_endpoint) + .build() + .expect("Failed to create OTLP span exporter"); + + let provider = SdkTracerProvider::builder() + .with_batch_exporter(exporter) + .build(); + + global::set_tracer_provider(provider.clone()); + + // Create OpenTelemetry tracing layer using TracerProvider trait + use opentelemetry::trace::TracerProvider as _; + let telemetry_layer = + tracing_opentelemetry::layer().with_tracer(provider.tracer("brightstaff")); + + // Combine the OpenTelemetry layer with fmt layer using the registry + let subscriber = tracing_subscriber::registry() + .with(telemetry_layer) + .with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info"))) + .with(tracing_subscriber::fmt::layer().event_format(BracketedFormatter)); + + tracing::subscriber::set_global_default(subscriber) + .expect("Failed to set tracing subscriber"); + + provider + } else { + // Tracing disabled - use no-op provider + let provider = SdkTracerProvider::builder().build(); + global::set_tracer_provider(provider.clone()); + + tracing_subscriber::fmt() + .with_env_filter( + EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")), + ) + .event_format(BracketedFormatter) + .init(); + + provider + } }) } diff --git a/demos/use_cases/mcp_filter/docker-compose.yaml b/demos/use_cases/mcp_filter/docker-compose.yaml index 469bf0d5..279f2d18 100644 --- a/demos/use_cases/mcp_filter/docker-compose.yaml +++ b/demos/use_cases/mcp_filter/docker-compose.yaml @@ -16,6 +16,8 @@ services: context: ../../../ dockerfile: Dockerfile ports: + - "11000:11000" + - "12001:12001" - "12000:12000" - "8001:8001" environment: