use standard tracing patterns

This commit is contained in:
Adil Hafeez 2026-01-28 20:38:54 -08:00
parent 43bdd0bfcf
commit 7b71364424
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
13 changed files with 291 additions and 1012 deletions

View file

@ -206,7 +206,7 @@ static_resources:
- name: outbound_api_traffic
address:
socket_address:
address: 127.0.0.1
address: 0.0.0.0
port_value: 11000
traffic_direction: OUTBOUND
filter_chains:
@ -1007,7 +1007,7 @@ static_resources:
- endpoint:
address:
socket_address:
address: 0.0.0.0
address: host.docker.internal
port_value: 9091
hostname: localhost

View file

@ -13,7 +13,9 @@
"env": {
"RUST_LOG": "debug",
"RUST_BACKTRACE": "1",
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/preference_based_routing/arch_config_rendered.yaml"
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/mcp_filter/config.yaml_rendered",
"OTEL_COLLECTOR_URL": "http://localhost:4317",
"OTEL_TRACING_ENABLED": "true"
},
"preLaunchTask": "rust: cargo build"
}

175
crates/Cargo.lock generated
View file

@ -1,6 +1,6 @@
# This file is automatically @generated by Cargo.
# It is not intended for manual editing.
version = 3
version = 4
[[package]]
name = "acap"
@ -195,7 +195,7 @@ dependencies = [
"serde_urlencoded",
"sync_wrapper",
"tokio",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
"tracing",
@ -354,7 +354,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234113d19d0d7d613b40e86fb654acf958910802bcceab913a4f9e7cda03b1a4"
dependencies = [
"memchr",
"regex-automata 0.4.9",
"regex-automata",
"serde",
]
@ -908,12 +908,6 @@ version = "0.31.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f"
[[package]]
name = "glob"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a8d1add55171497b4705a648c6b583acafb01d58050a51727785f0b2c8e0a2b2"
[[package]]
name = "governor"
version = "0.6.3"
@ -1526,11 +1520,11 @@ checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "matchers"
version = "0.1.0"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
checksum = "d1525a2a28c7f4fa0fc98bb91ae755d1e2d1505079e05539e35bc876b5d65ae9"
dependencies = [
"regex-automata 0.1.10",
"regex-automata",
]
[[package]]
@ -1672,12 +1666,11 @@ checksum = "38bf9645c8b145698bb0b18a4637dcacbc421ea49bef2317e4fd8065a387cf21"
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
version = "0.50.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5"
dependencies = [
"overload",
"winapi",
"windows-sys 0.59.0",
]
[[package]]
@ -1765,9 +1758,9 @@ dependencies = [
[[package]]
name = "opentelemetry"
version = "0.29.1"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e87237e2775f74896f9ad219d26a2081751187eb7c9f5c58dde20a23b95d16c"
checksum = "b84bcd6ae87133e903af7ef497404dda70c60d0ea14895fc8a5e6722754fc2a0"
dependencies = [
"futures-core",
"futures-sink",
@ -1779,25 +1772,23 @@ dependencies = [
[[package]]
name = "opentelemetry-http"
version = "0.29.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "46d7ab32b827b5b495bd90fa95a6cb65ccc293555dcc3199ae2937d2d237c8ed"
checksum = "d7a6d09a73194e6b66df7c8f1b680f156d916a1a942abf2de06823dd02b7855d"
dependencies = [
"async-trait",
"bytes",
"http 1.3.1",
"opentelemetry",
"reqwest",
"tracing",
]
[[package]]
name = "opentelemetry-otlp"
version = "0.29.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d899720fe06916ccba71c01d04ecd77312734e2de3467fd30d9d580c8ce85656"
checksum = "7a2366db2dca4d2ad033cad11e6ee42844fd727007af5ad04a1730f4cb8163bf"
dependencies = [
"futures-core",
"http 1.3.1",
"opentelemetry",
"opentelemetry-http",
@ -1813,44 +1804,43 @@ dependencies = [
[[package]]
name = "opentelemetry-proto"
version = "0.29.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8c40da242381435e18570d5b9d50aca2a4f4f4d8e146231adb4e7768023309b3"
checksum = "a7175df06de5eaee9909d4805a3d07e28bb752c34cab57fa9cff549da596b30f"
dependencies = [
"opentelemetry",
"opentelemetry_sdk",
"prost",
"tonic",
"tonic-prost",
]
[[package]]
name = "opentelemetry-stdout"
version = "0.29.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7e27d446dabd68610ef0b77d07b102ecde827a4596ea9c01a4d3811e945b286"
checksum = "bc8887887e169414f637b18751487cce4e095be787d23fad13c454e2fb1b3811"
dependencies = [
"chrono",
"futures-util",
"opentelemetry",
"opentelemetry_sdk",
]
[[package]]
name = "opentelemetry_sdk"
version = "0.29.0"
version = "0.31.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afdefb21d1d47394abc1ba6c57363ab141be19e27cc70d0e422b7f303e4d290b"
checksum = "e14ae4f5991976fd48df6d843de219ca6d31b01daaab2dad5af2badeded372bd"
dependencies = [
"futures-channel",
"futures-executor",
"futures-util",
"glob",
"opentelemetry",
"percent-encoding",
"rand 0.9.2",
"serde_json",
"thiserror 2.0.12",
"tracing",
"tokio",
"tokio-stream",
]
[[package]]
@ -1859,12 +1849,6 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a80800c0488c3a21695ea981a54918fbb37abf04f4d0720c453632255e2ff0e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.4"
@ -2054,9 +2038,9 @@ dependencies = [
[[package]]
name = "prost"
version = "0.13.5"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2796faa41db3ec313a31f7624d9286acf277b52de526150b7e69f3debf891ee5"
checksum = "d2ea70524a2f82d518bce41317d0fae74151505651af45faf1ffbd6fd33f0568"
dependencies = [
"bytes",
"prost-derive",
@ -2064,9 +2048,9 @@ dependencies = [
[[package]]
name = "prost-derive"
version = "0.13.5"
version = "0.14.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a56d757972c98b346a9b766e3f02746cde6dd1cd1d1d563472929fdd74bec4d"
checksum = "27c6023962132f4b30eb4c172c91ce92d933da334c59c23cddee82358ddafb0b"
dependencies = [
"anyhow",
"itertools",
@ -2251,17 +2235,8 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
"regex-automata",
"regex-syntax",
]
[[package]]
@ -2272,15 +2247,9 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax 0.8.5",
"regex-syntax",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
@ -2328,7 +2297,7 @@ dependencies = [
"tokio-native-tls",
"tokio-rustls 0.26.2",
"tokio-util",
"tower 0.5.2",
"tower",
"tower-http",
"tower-service",
"url",
@ -3157,9 +3126,9 @@ dependencies = [
[[package]]
name = "tonic"
version = "0.12.3"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "877c5b330756d856ffcc4553ab34a5684481ade925ecc54bcd1bf02b1d0d4d52"
checksum = "eb7613188ce9f7df5bfe185db26c5814347d110db17920415cf2fbcad85e7203"
dependencies = [
"async-trait",
"base64 0.22.1",
@ -3172,33 +3141,24 @@ dependencies = [
"hyper-util",
"percent-encoding",
"pin-project",
"prost",
"sync_wrapper",
"tokio",
"tokio-stream",
"tower 0.4.13",
"tower",
"tower-layer",
"tower-service",
"tracing",
]
[[package]]
name = "tower"
version = "0.4.13"
name = "tonic-prost"
version = "0.14.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b8fa9be0de6cf49e536ce1851f987bd21a43b771b09473c3549a6c853db37c1c"
checksum = "66bd50ad6ce1252d87ef024b3d64fe4c3cf54a86fb9ef4c631fdd0ded7aeaa67"
dependencies = [
"futures-core",
"futures-util",
"indexmap 1.9.3",
"pin-project",
"pin-project-lite",
"rand 0.8.5",
"slab",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
"bytes",
"prost",
"tonic",
]
[[package]]
@ -3209,9 +3169,12 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
"indexmap 2.9.0",
"pin-project-lite",
"slab",
"sync_wrapper",
"tokio",
"tokio-util",
"tower-layer",
"tower-service",
"tracing",
@ -3230,7 +3193,7 @@ dependencies = [
"http-body 1.0.1",
"iri-string",
"pin-project-lite",
"tower 0.5.2",
"tower",
"tower-layer",
"tower-service",
]
@ -3249,9 +3212,9 @@ checksum = "8df9b6e13f2d32c91b9bd719c00d1958837bc7dec474d94952798cc8e69eeec3"
[[package]]
name = "tracing"
version = "0.1.41"
version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0"
checksum = "63e71662fa4b2a2c3a26f570f037eb95bb1f85397f3cd8076caed2f026a6d100"
dependencies = [
"log",
"pin-project-lite",
@ -3261,9 +3224,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
version = "0.1.28"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "395ae124c09f9e6918a2310af6038fba074bcf474ac352496d5910dd59a2226d"
checksum = "7490cfa5ec963746568740651ac6781f701c9c5ea257c58e057f3ba8cf69e8da"
dependencies = [
"proc-macro2",
"quote",
@ -3272,9 +3235,9 @@ dependencies = [
[[package]]
name = "tracing-core"
version = "0.1.33"
version = "0.1.36"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
checksum = "db97caf9d906fbde555dd62fa95ddba9eecfd14cb388e4f491a66d74cd5fb79a"
dependencies = [
"once_cell",
"valuable",
@ -3293,14 +3256,12 @@ dependencies = [
[[package]]
name = "tracing-opentelemetry"
version = "0.30.0"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fd8e764bd6f5813fd8bebc3117875190c5b0415be8f7f8059bffb6ecd979c444"
checksum = "1ac28f2d093c6c477eaa76b23525478f38de514fa9aeb1285738d4b97a9552fc"
dependencies = [
"js-sys",
"once_cell",
"opentelemetry",
"opentelemetry_sdk",
"smallvec",
"tracing",
"tracing-core",
@ -3311,14 +3272,14 @@ dependencies = [
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
version = "0.3.22"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
checksum = "2f30143827ddab0d256fd843b7a66d164e9f271cfa0dde49142c5ca0ca291f1e"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"regex-automata",
"sharded-slab",
"smallvec",
"thread_local",
@ -3589,28 +3550,6 @@ dependencies = [
"web-sys",
]
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.61.2"

View file

@ -19,11 +19,11 @@ http-body = "1.0.1"
http-body-util = "0.1.3"
hyper = { version = "1.6.0", features = ["full"] }
hyper-util = "0.1.11"
opentelemetry = "0.29.1"
opentelemetry-http = "0.29.0"
opentelemetry-otlp = {version="0.29.0", features=["trace", "tonic", "grpc-tonic"]}
opentelemetry-stdout = "0.29.0"
opentelemetry_sdk = "0.29.0"
opentelemetry = "0.31"
opentelemetry-http = "0.31"
opentelemetry-otlp = {version="0.31", features=["trace", "grpc-tonic"]}
opentelemetry-stdout = "0.31"
opentelemetry_sdk = { version = "0.31", features = ["rt-tokio"] }
pretty_assertions = "1.4.1"
rand = "0.9.2"
reqwest = { version = "0.12.15", features = ["stream"] }
@ -38,6 +38,7 @@ tokio-postgres = { version = "0.7", features = ["with-serde_json-1"] }
tokio-stream = "0.1"
time = { version = "0.3", features = ["formatting", "macros"] }
tracing = "0.1"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
uuid = { version = "1.0", features = ["v4", "serde"] }
@ -45,5 +46,5 @@ uuid = { version = "1.0", features = ["v4", "serde"] }
mockito = "1.0"
tokio-stream = "0.1.17"
tracing = "0.1.41"
tracing-opentelemetry = "0.30.0"
tracing-opentelemetry = "0.32.1"
tracing-subscriber = { version = "0.3.19", features = ["env-filter", "fmt", "time"] }

View file

@ -1,9 +1,7 @@
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use bytes::Bytes;
use common::consts::TRACE_PARENT_HEADER;
use common::traces::{generate_random_span_id, parse_traceparent, SpanBuilder, SpanKind};
use hermesllm::apis::OpenAIMessage;
use hermesllm::clients::SupportedAPIsFromClient;
use hermesllm::providers::request::ProviderRequest;
@ -12,13 +10,12 @@ use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::{Request, Response};
use serde::ser::Error as SerError;
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{http, operation_component, OperationNameBuilder};
/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
@ -41,17 +38,8 @@ pub async fn agent_chat(
_: String,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
match handle_agent_chat(
request,
orchestrator_service,
agents_list,
listeners,
trace_collector,
)
.await
{
match handle_agent_chat(request, orchestrator_service, agents_list, listeners).await {
Ok(response) => Ok(response),
Err(err) => {
// Check if this is a client error from the pipeline that should be cascaded
@ -121,12 +109,20 @@ pub async fn agent_chat(
}
}
#[instrument(
name = "agent_chat_handler",
skip(request, orchestrator_service, agents_list, listeners),
level = "info",
fields(
http.method = %request.method(),
http.path = %request.uri().path()
)
)]
async fn handle_agent_chat(
request: Request<hyper::body::Incoming>,
orchestrator_service: Arc<OrchestratorService>,
agents_list: Arc<tokio::sync::RwLock<Option<Vec<common::configuration::Agent>>>>,
listeners: Arc<tokio::sync::RwLock<Vec<common::configuration::Listener>>>,
trace_collector: Arc<common::traces::TraceCollector>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(orchestrator_service);
@ -223,63 +219,11 @@ async fn handle_agent_chat(
agent_selector.create_agent_map(agents)
};
// Parse trace parent to get trace_id and parent_span_id
let (trace_id, parent_span_id) = if let Some(ref tp) = traceparent {
parse_traceparent(tp)
} else {
(String::new(), None)
};
// Select appropriate agents using arch orchestrator llm model
let selection_span_id = generate_random_span_id();
let selection_start_time = SystemTime::now();
let selection_start_instant = Instant::now();
let selected_agents = agent_selector
.select_agents(&message, &listener, traceparent.clone(), request_id.clone())
.await?;
// Record agent selection span
let selection_end_time = SystemTime::now();
let selection_elapsed = selection_start_instant.elapsed();
let selection_operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path("/agents/select")
.with_target(&listener.name)
.build();
let mut selection_span_builder = SpanBuilder::new(&selection_operation_name)
.with_span_id(selection_span_id)
.with_kind(SpanKind::Internal)
.with_start_time(selection_start_time)
.with_end_time(selection_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/select")
.with_attribute("selection.listener", listener.name.clone())
.with_attribute("selection.agent_count", selected_agents.len().to_string())
.with_attribute(
"selection.agents",
selected_agents
.iter()
.map(|a| a.id.as_str())
.collect::<Vec<_>>()
.join(","),
)
.with_attribute(
"duration_ms",
format!("{:.2}", selection_elapsed.as_secs_f64() * 1000.0),
);
if !trace_id.is_empty() {
selection_span_builder = selection_span_builder.with_trace_id(trace_id.clone());
}
if let Some(parent_id) = parent_span_id.clone() {
selection_span_builder = selection_span_builder.with_parent_span_id(parent_id);
}
let selection_span = selection_span_builder.build();
trace_collector.record_span(operation_component::ORCHESTRATOR, selection_span);
info!("Selected {} agent(s) for execution", selected_agents.len());
// Execute agents sequentially, passing output from one to the next
@ -296,11 +240,6 @@ async fn handle_agent_chat(
selected_agent.id
);
// Record the start time for agent span
let agent_start_time = SystemTime::now();
let agent_start_instant = Instant::now();
let span_id = generate_random_span_id();
// Get agent name
let agent_name = selected_agent.id.clone();
@ -311,9 +250,6 @@ async fn handle_agent_chat(
selected_agent,
&agent_map,
&request_headers,
Some(&trace_collector),
trace_id.clone(),
span_id.clone(),
)
.await?;
@ -328,48 +264,9 @@ async fn handle_agent_chat(
client_request.clone(),
agent,
&request_headers,
trace_id.clone(),
span_id.clone(),
)
.await?;
// Record agent span
let agent_end_time = SystemTime::now();
let agent_elapsed = agent_start_instant.elapsed();
let full_path = format!("/agents{}", request_path);
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path(&full_path)
.with_target(&agent_name)
.build();
let mut span_builder = SpanBuilder::new(&operation_name)
.with_span_id(span_id)
.with_kind(SpanKind::Internal)
.with_start_time(agent_start_time)
.with_end_time(agent_end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, full_path)
.with_attribute("agent.name", agent_name.clone())
.with_attribute(
"agent.sequence",
format!("{}/{}", agent_index + 1, agent_count),
)
.with_attribute(
"duration_ms",
format!("{:.2}", agent_elapsed.as_secs_f64() * 1000.0),
);
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id.clone());
}
if let Some(parent_id) = parent_span_id.clone() {
span_builder = span_builder.with_parent_span_id(parent_id);
}
let span = span_builder.build();
trace_collector.record_span(operation_component::AGENT, span);
// If this is the last agent, return the streaming response
if is_last_agent {
info!(

View file

@ -112,15 +112,7 @@ mod tests {
let headers = HeaderMap::new();
let result = pipeline_processor
.process_filter_chain(
&request.messages,
&test_pipeline,
&agent_map,
&headers,
None,
String::new(),
String::new(),
)
.process_filter_chain(&request.messages, &test_pipeline, &agent_map, &headers)
.await;
println!("Pipeline processing result: {:?}", result);

View file

@ -3,7 +3,6 @@ use common::configuration::{LlmProvider, ModelAlias};
use common::consts::{
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
use common::traces::TraceCollector;
use hermesllm::apis::openai_responses::InputParam;
use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs};
use hermesllm::{ProviderRequest, ProviderRequestType};
@ -11,10 +10,11 @@ use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full};
use hyper::header::{self};
use hyper::{Request, Response, StatusCode};
use opentelemetry::trace::get_active_span;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use tracing::{debug, info, instrument, warn};
use crate::handlers::router_chat::router_chat_get_upstream_model;
use crate::handlers::utils::{
@ -33,13 +33,23 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed()
}
#[instrument(
name = "llm_chat_handler",
skip_all,
fields(
http.method = %request.method(),
http.path = %request.uri().path(),
model.requested = tracing::field::Empty,
model.alias_resolved = tracing::field::Empty,
model.routing_resolved = tracing::field::Empty
)
)]
pub async fn llm_chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
full_qualified_llm_provider_url: String,
model_aliases: Arc<Option<HashMap<String, ModelAlias>>>,
llm_providers: Arc<RwLock<Vec<LlmProvider>>>,
trace_collector: Arc<TraceCollector>,
state_storage: Option<Arc<dyn StateStorage>>,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
@ -119,13 +129,17 @@ pub async fn llm_chat(
// Model alias resolution: update model field in client_request immediately
// This ensures all downstream objects use the resolved model
let model_from_request = client_request.model().to_string();
let temperature = client_request.get_temperature();
let _temperature = client_request.get_temperature();
let is_streaming_request = client_request.is_streaming();
let resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
// Record model information in span
tracing::Span::current().record("model.requested", model_from_request.as_str());
tracing::Span::current().record("model.alias_resolved", resolved_model.as_str());
// Extract tool names and user message preview for span attributes
let tool_names = client_request.get_tool_names();
let user_message_preview = client_request
let _tool_names = client_request.get_tool_names();
let _user_message_preview = client_request
.get_recent_user_message()
.map(|msg| truncate_message(&msg, 50));
@ -225,7 +239,6 @@ pub async fn llm_chat(
let routing_result = match router_chat_get_upstream_model(
router_service,
client_request, // Pass the original request - router_chat will convert it
trace_collector.clone(),
&traceparent,
&request_path,
&request_id,
@ -242,6 +255,13 @@ pub async fn llm_chat(
let model_name = routing_result.model_name;
// Record the routed model in span
tracing::Span::current().record("model.routing_resolved", model_name.as_str());
get_active_span(|span| {
span.update_name(format!("llm_chat POST {} -> {}", request_path, model_name));
});
debug!(
"[PLANO_REQ_ID:{}] | ARCH_ROUTER URL | {}, Resolved Model: {}",
request_id, full_qualified_llm_provider_url, model_name
@ -261,7 +281,7 @@ pub async fn llm_chat(
// Capture start time right before sending request to upstream
let request_start_time = std::time::Instant::now();
let request_start_system_time = std::time::SystemTime::now();
let _request_start_system_time = std::time::SystemTime::now();
let llm_response = match reqwest::Client::new()
.post(full_qualified_llm_provider_url)
@ -291,27 +311,9 @@ pub async fn llm_chat(
// Build LLM span with actual status code using constants
let byte_stream = llm_response.bytes_stream();
// Build the LLM span (will be finalized after streaming completes)
let llm_span = build_llm_span(
&traceparent,
&request_path,
&resolved_model,
&model_name,
upstream_status.as_u16(),
is_streaming_request,
request_start_system_time,
tool_names,
user_message_preview,
temperature,
&llm_providers,
)
.await;
// Create base processor for metrics and tracing
let base_processor = ObservableStreamProcessor::new(
trace_collector,
operation_component::LLM,
llm_span,
request_start_time,
Some(messages_for_signals),
);
@ -376,88 +378,6 @@ fn resolve_model_alias(
model_from_request.to_string()
}
/// Builds the LLM span with all required and optional attributes.
#[allow(clippy::too_many_arguments)]
async fn build_llm_span(
traceparent: &str,
request_path: &str,
resolved_model: &str,
model_name: &str,
status_code: u16,
is_streaming: bool,
start_time: std::time::SystemTime,
tool_names: Option<Vec<String>>,
user_message_preview: Option<String>,
temperature: Option<f32>,
llm_providers: &Arc<RwLock<Vec<LlmProvider>>>,
) -> common::traces::Span {
use crate::tracing::{http, llm, OperationNameBuilder};
use common::traces::{parse_traceparent, SpanBuilder, SpanKind};
// Calculate the upstream path based on provider configuration
let upstream_path = get_upstream_path(
llm_providers,
model_name,
request_path,
resolved_model,
is_streaming,
)
.await;
// Build operation name showing path transformation if different
let operation_name = if request_path != upstream_path {
OperationNameBuilder::new()
.with_method("POST")
.with_path(format!("{} >> {}", request_path, upstream_path))
.with_target(resolved_model)
.build()
} else {
OperationNameBuilder::new()
.with_method("POST")
.with_path(request_path)
.with_target(resolved_model)
.build()
};
let (trace_id, parent_span_id) = parse_traceparent(traceparent);
let mut span_builder = SpanBuilder::new(&operation_name)
.with_trace_id(&trace_id)
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::STATUS_CODE, status_code.to_string())
.with_attribute(http::TARGET, request_path.to_string())
.with_attribute(http::UPSTREAM_TARGET, upstream_path)
.with_attribute(llm::MODEL_NAME, resolved_model.to_string())
.with_attribute(llm::IS_STREAMING, is_streaming.to_string());
// Only set parent span ID if it exists (not a root span)
if let Some(parent) = parent_span_id {
span_builder = span_builder.with_parent_span_id(&parent);
}
// Add optional attributes
if let Some(temp) = temperature {
span_builder = span_builder.with_attribute(llm::TEMPERATURE, temp.to_string());
}
if let Some(tools) = tool_names {
let formatted_tools = tools
.iter()
.map(|name| format!("{}(...)", name))
.collect::<Vec<_>>()
.join("\n");
span_builder = span_builder.with_attribute(llm::TOOLS, formatted_tools);
}
if let Some(preview) = user_message_preview {
span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview);
}
span_builder.build()
}
/// Calculates the upstream path for the provider based on the model name.
/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix,
/// then uses target_endpoint_for_provider to calculate the correct upstream path.

View file

@ -4,15 +4,12 @@ use common::configuration::{Agent, AgentFilterChain};
use common::consts::{
ARCH_UPSTREAM_HOST_HEADER, BRIGHT_STAFF_SERVICE_NAME, ENVOY_RETRY_HEADER, TRACE_PARENT_HEADER,
};
use common::traces::{generate_random_span_id, SpanBuilder, SpanKind};
use hermesllm::apis::openai::Message;
use hermesllm::{ProviderRequest, ProviderRequestType};
use hyper::header::HeaderMap;
use std::time::{Instant, SystemTime};
use tracing::{debug, info, warn};
use crate::tracing::operation_component::{self};
use crate::tracing::{http, OperationNameBuilder};
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use tracing::{debug, info, instrument, warn};
use crate::handlers::jsonrpc::{
JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION,
@ -53,6 +50,19 @@ pub enum PipelineError {
},
}
/// Adapter to inject OpenTelemetry trace context into Hyper HeaderMap
struct HeaderMapInjector<'a>(&'a mut HeaderMap);
impl<'a> Injector for HeaderMapInjector<'a> {
fn set(&mut self, key: &str, value: String) {
if let Ok(header_name) = hyper::header::HeaderName::from_bytes(key.as_bytes()) {
if let Ok(header_value) = hyper::header::HeaderValue::from_str(&value) {
self.0.insert(header_name, header_value);
}
}
}
}
/// Service for processing agent pipelines
pub struct PipelineProcessor {
client: reqwest::Client,
@ -81,115 +91,14 @@ impl PipelineProcessor {
}
}
/// Record a span for filter execution
#[allow(clippy::too_many_arguments)]
fn record_filter_span(
&self,
collector: &std::sync::Arc<common::traces::TraceCollector>,
agent_name: &str,
tool_name: &str,
start_time: SystemTime,
end_time: SystemTime,
elapsed: std::time::Duration,
trace_id: String,
parent_span_id: String,
span_id: String,
) -> String {
// let (trace_id, parent_span_id) = self.extract_trace_context();
// Build operation name: POST /agents/* {filter_name}
// Using generic path since we don't have access to specific endpoint here
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path("/agents/*")
.with_target(agent_name)
.build();
let mut span_builder = SpanBuilder::new(&operation_name)
.with_span_id(span_id.clone())
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, "/agents/*")
.with_attribute("filter.name", agent_name.to_string())
.with_attribute("filter.tool_name", tool_name.to_string())
.with_attribute(
"duration_ms",
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
);
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if !parent_span_id.is_empty() {
span_builder = span_builder.with_parent_span_id(parent_span_id);
}
let span = span_builder.build();
// Use plano(filter) as service name for filter execution spans
collector.record_span(operation_component::AGENT_FILTER, span);
span_id.clone()
}
/// Record a span for MCP protocol interactions
#[allow(clippy::too_many_arguments)]
fn record_agent_filter_span(
&self,
collector: &std::sync::Arc<common::traces::TraceCollector>,
operation: &str,
agent_id: &str,
start_time: SystemTime,
end_time: SystemTime,
elapsed: std::time::Duration,
additional_attrs: Option<HashMap<&str, String>>,
trace_id: String,
parent_span_id: String,
span_id: Option<String>,
) {
// let (trace_id, parent_span_id) = self.extract_trace_context();
// Build operation name: POST /mcp {agent_id}
let operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path("/mcp")
.with_operation(operation)
.with_target(agent_id)
.build();
let mut span_builder = SpanBuilder::new(&operation_name)
.with_span_id(span_id.unwrap_or_else(generate_random_span_id))
.with_kind(SpanKind::Client)
.with_start_time(start_time)
.with_end_time(end_time)
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, format!("/mcp ({})", operation))
.with_attribute("mcp.operation", operation.to_string())
.with_attribute("mcp.agent_id", agent_id.to_string())
.with_attribute(
"duration_ms",
format!("{:.2}", elapsed.as_secs_f64() * 1000.0),
);
if let Some(attrs) = additional_attrs {
for (key, value) in attrs {
span_builder = span_builder.with_attribute(key, value);
}
}
if !trace_id.is_empty() {
span_builder = span_builder.with_trace_id(trace_id);
}
if !parent_span_id.is_empty() {
span_builder = span_builder.with_parent_span_id(parent_span_id);
}
let span = span_builder.build();
// MCP spans also use plano(filter) service name as they are part of filter operations
collector.record_span(operation_component::AGENT_FILTER, span);
}
/// Process the filter chain of agents (all except the terminal agent)
#[instrument(
skip(self, chat_history, agent_filter_chain, agent_map, request_headers),
fields(
filter_count = agent_filter_chain.filter_chain.as_ref().map(|fc| fc.len()).unwrap_or(0),
message_count = chat_history.len()
)
)]
#[allow(clippy::too_many_arguments)]
pub async fn process_filter_chain(
&mut self,
@ -197,9 +106,6 @@ impl PipelineProcessor {
agent_filter_chain: &AgentFilterChain,
agent_map: &HashMap<String, Agent>,
request_headers: &HeaderMap,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
trace_id: String,
parent_span_id: String,
) -> Result<Vec<Message>, PipelineError> {
let mut chat_history_updated = chat_history.to_vec();
@ -227,60 +133,21 @@ impl PipelineProcessor {
chat_history.len()
);
let start_time = SystemTime::now();
let start_instant = Instant::now();
// Generate filter span ID before execution so MCP spans can use it as parent
let filter_span_id = generate_random_span_id();
if agent.agent_type.as_deref().unwrap_or("mcp") == "mcp" {
chat_history_updated = self
.execute_mcp_filter(
&chat_history_updated,
agent,
request_headers,
trace_collector,
trace_id.clone(),
filter_span_id.clone(),
)
.execute_mcp_filter(&chat_history_updated, agent, request_headers)
.await?;
} else {
chat_history_updated = self
.execute_http_filter(
&chat_history_updated,
agent,
request_headers,
trace_collector,
trace_id.clone(),
filter_span_id.clone(),
)
.execute_http_filter(&chat_history_updated, agent, request_headers)
.await?;
}
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
info!(
"Filter '{}' completed in {:.2}ms, updated conversation length: {}",
"Filter '{}' completed, updated conversation length: {}",
agent_name,
elapsed.as_secs_f64() * 1000.0,
chat_history_updated.len()
);
// Record span for this filter execution
if let Some(collector) = trace_collector {
self.record_filter_span(
collector,
agent_name,
tool_name,
start_time,
end_time,
elapsed,
trace_id.clone(),
parent_span_id.clone(),
filter_span_id,
);
}
}
Ok(chat_history_updated)
@ -292,18 +159,17 @@ impl PipelineProcessor {
request_headers: &HeaderMap,
agent_id: &str,
session_id: Option<&str>,
trace_id: String,
parent_span_id: String,
) -> Result<HeaderMap, PipelineError> {
let trace_parent = format!("00-{}-{}-01", trace_id, parent_span_id);
let mut headers = request_headers.clone();
headers.remove(hyper::header::CONTENT_LENGTH);
// Inject OpenTelemetry trace context automatically
headers.remove(TRACE_PARENT_HEADER);
headers.insert(
TRACE_PARENT_HEADER,
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
);
global::get_text_map_propagator(|propagator| {
let cx =
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut headers));
});
headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
@ -429,27 +295,31 @@ impl PipelineProcessor {
}
/// Send request to a specific agent and return the response content
#[instrument(
skip(self, messages, agent, request_headers),
fields(
agent_id = %agent.id,
filter_name = %agent.id,
message_count = messages.len()
)
)]
async fn execute_mcp_filter(
&mut self,
messages: &[Message],
agent: &Agent,
request_headers: &HeaderMap,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
trace_id: String,
filter_span_id: String,
) -> Result<Vec<Message>, PipelineError> {
// Update current span name to include filter name
use opentelemetry::trace::get_active_span;
get_active_span(|span| {
span.update_name(format!("execute_mcp_filter ({})", agent.id));
});
// Get or create MCP session
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
session_id.clone()
} else {
let session_id = self
.get_new_session_id(
&agent.id,
trace_id.clone(),
filter_span_id.clone(),
request_headers,
)
.await;
let session_id = self.get_new_session_id(&agent.id, request_headers).await;
self.agent_id_session_map
.insert(agent.id.clone(), session_id.clone());
session_id
@ -464,21 +334,9 @@ impl PipelineProcessor {
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
let json_rpc_request = self.build_tool_call_request(tool_name, messages)?;
// Generate span ID for this MCP tool call (child of filter span)
let mcp_span_id = generate_random_span_id();
// Build headers
let agent_headers = self.build_mcp_headers(
request_headers,
&agent.id,
Some(&mcp_session_id),
trace_id.clone(),
mcp_span_id.clone(),
)?;
// Send request with tracing
let start_time = SystemTime::now();
let start_instant = Instant::now();
let agent_headers =
self.build_mcp_headers(request_headers, &agent.id, Some(&mcp_session_id))?;
let response = self
.send_mcp_request(&json_rpc_request, &agent_headers, &agent.id)
@ -486,31 +344,6 @@ impl PipelineProcessor {
let http_status = response.status();
let response_bytes = response.bytes().await?;
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
// Record MCP tool call span
if let Some(collector) = trace_collector {
let mut attrs = HashMap::new();
attrs.insert("mcp.method", "tools/call".to_string());
attrs.insert("mcp.tool_name", tool_name.to_string());
attrs.insert("mcp.session_id", mcp_session_id.clone());
attrs.insert("http.status_code", http_status.as_u16().to_string());
self.record_agent_filter_span(
collector,
"tool_call",
&agent.id,
start_time,
end_time,
elapsed,
Some(attrs),
trace_id.clone(),
filter_span_id.clone(),
Some(mcp_span_id),
);
}
// Handle HTTP errors
if !http_status.is_success() {
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
@ -611,8 +444,6 @@ impl PipelineProcessor {
&self,
agent_id: &str,
session_id: &str,
trace_id: String,
parent_span_id: String,
request_headers: &HeaderMap,
) -> Result<(), PipelineError> {
let initialized_notification = JsonRpcNotification {
@ -624,13 +455,7 @@ impl PipelineProcessor {
let notification_body = serde_json::to_string(&initialized_notification)?;
debug!("Sending initialized notification for agent {}", agent_id);
let headers = self.build_mcp_headers(
request_headers,
agent_id,
Some(session_id),
trace_id.clone(),
parent_span_id.clone(),
)?;
let headers = self.build_mcp_headers(request_headers, agent_id, Some(session_id))?;
let response = self
.client
@ -648,24 +473,12 @@ impl PipelineProcessor {
Ok(())
}
async fn get_new_session_id(
&self,
agent_id: &str,
trace_id: String,
parent_span_id: String,
request_headers: &HeaderMap,
) -> String {
async fn get_new_session_id(&self, agent_id: &str, request_headers: &HeaderMap) -> String {
info!("Initializing MCP session for agent {}", agent_id);
let initialize_request = self.build_initialize_request();
let headers = self
.build_mcp_headers(
request_headers,
agent_id,
None,
trace_id.clone(),
parent_span_id.clone(),
)
.build_mcp_headers(request_headers, agent_id, None)
.expect("Failed to build headers for initialization");
let response = self
@ -688,44 +501,46 @@ impl PipelineProcessor {
);
// Send initialized notification
self.send_initialized_notification(
agent_id,
&session_id,
trace_id.clone(),
parent_span_id.clone(),
&headers,
)
.await
.expect("Failed to send initialized notification");
self.send_initialized_notification(agent_id, &session_id, &headers)
.await
.expect("Failed to send initialized notification");
session_id
}
/// Execute a HTTP-based filter agent
#[instrument(
skip(self, messages, agent, request_headers),
fields(
agent_id = %agent.id,
agent_url = %agent.url,
filter_name = %agent.id,
message_count = messages.len()
)
)]
async fn execute_http_filter(
&mut self,
messages: &[Message],
agent: &Agent,
request_headers: &HeaderMap,
trace_collector: Option<&std::sync::Arc<common::traces::TraceCollector>>,
trace_id: String,
filter_span_id: String,
) -> Result<Vec<Message>, PipelineError> {
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
// Generate span ID for this HTTP call (child of filter span)
let http_span_id = generate_random_span_id();
// Update current span name to include filter name
use opentelemetry::trace::get_active_span;
get_active_span(|span| {
span.update_name(format!("execute_http_filter ({})", agent.id));
});
// Build headers
let trace_parent = format!("00-{}-{}-01", trace_id, http_span_id);
let mut agent_headers = request_headers.clone();
agent_headers.remove(hyper::header::CONTENT_LENGTH);
// Inject OpenTelemetry trace context automatically
agent_headers.remove(TRACE_PARENT_HEADER);
agent_headers.insert(
TRACE_PARENT_HEADER,
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
);
global::get_text_map_propagator(|propagator| {
let cx =
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers));
});
agent_headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
@ -748,10 +563,6 @@ impl PipelineProcessor {
hyper::header::HeaderValue::from_static("application/json"),
);
// Send request with tracing
let start_time = SystemTime::now();
let start_instant = Instant::now();
debug!(
"Sending HTTP request to agent {} at URL: {}",
agent.id, agent.url
@ -769,30 +580,6 @@ impl PipelineProcessor {
let http_status = response.status();
let response_bytes = response.bytes().await?;
let end_time = SystemTime::now();
let elapsed = start_instant.elapsed();
// Record HTTP call span
if let Some(collector) = trace_collector {
let mut attrs = HashMap::new();
attrs.insert("http.tool_name", tool_name.to_string());
attrs.insert("http.url", agent.url.clone());
attrs.insert("http.status_code", http_status.as_u16().to_string());
self.record_agent_filter_span(
collector,
"http_call",
&agent.id,
start_time,
end_time,
elapsed,
Some(attrs),
trace_id.clone(),
filter_span_id.clone(),
Some(http_span_id),
);
}
// Handle HTTP errors
if !http_status.is_success() {
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
@ -825,14 +612,19 @@ impl PipelineProcessor {
}
/// Send request to terminal agent and return the raw response for streaming
#[instrument(
skip(self, messages, original_request, terminal_agent, request_headers),
fields(
agent_id = %terminal_agent.id,
message_count = messages.len()
)
)]
pub async fn invoke_agent(
&self,
messages: &[Message],
mut original_request: ProviderRequestType,
terminal_agent: &Agent,
request_headers: &HeaderMap,
trace_id: String,
agent_span_id: String,
) -> Result<reqwest::Response, PipelineError> {
// let mut request = original_request.clone();
original_request.set_messages(messages);
@ -844,15 +636,13 @@ impl PipelineProcessor {
let mut agent_headers = request_headers.clone();
agent_headers.remove(hyper::header::CONTENT_LENGTH);
// Set traceparent header to make the egress span a child of the agent span
if !trace_id.is_empty() && !agent_span_id.is_empty() {
let trace_parent = format!("00-{}-{}-01", trace_id, agent_span_id);
agent_headers.remove(TRACE_PARENT_HEADER);
agent_headers.insert(
TRACE_PARENT_HEADER,
hyper::header::HeaderValue::from_str(&trace_parent).unwrap(),
);
}
// Inject OpenTelemetry trace context automatically
agent_headers.remove(TRACE_PARENT_HEADER);
global::get_text_map_propagator(|propagator| {
let cx =
tracing_opentelemetry::OpenTelemetrySpanExt::context(&tracing::Span::current());
propagator.inject_context(&cx, &mut HeaderMapInjector(&mut agent_headers));
});
agent_headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
@ -914,15 +704,7 @@ mod tests {
let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]);
let result = processor
.process_filter_chain(
&messages,
&pipeline,
&agent_map,
&request_headers,
None,
String::new(),
String::new(),
)
.process_filter_chain(&messages, &pipeline, &agent_map, &request_headers)
.await;
assert!(result.is_err());
@ -956,14 +738,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_mcp_filter(
&messages,
&agent,
&request_headers,
None,
"trace-123".to_string(),
"span-123".to_string(),
)
.execute_mcp_filter(&messages, &agent, &request_headers)
.await;
match result {
@ -1002,14 +777,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_mcp_filter(
&messages,
&agent,
&request_headers,
None,
"trace-456".to_string(),
"span-456".to_string(),
)
.execute_mcp_filter(&messages, &agent, &request_headers)
.await;
match result {
@ -1061,14 +829,7 @@ mod tests {
let request_headers = HeaderMap::new();
let result = processor
.execute_mcp_filter(
&messages,
&agent,
&request_headers,
None,
"trace-789".to_string(),
"span-789".to_string(),
)
.execute_mcp_filter(&messages, &agent, &request_headers)
.await;
match result {

View file

@ -1,14 +1,11 @@
use common::configuration::ModelUsagePreference;
use common::traces::{parse_traceparent, SpanBuilder, SpanKind, TraceCollector};
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use hermesllm::{ProviderRequest, ProviderRequestType};
use hyper::StatusCode;
use std::collections::HashMap;
use std::sync::Arc;
use tracing::{debug, info, warn};
use crate::router::llm_router::RouterService;
use crate::tracing::{http, operation_component, routing, OperationNameBuilder};
pub struct RoutingResult {
pub model_name: String,
@ -36,7 +33,6 @@ impl RoutingError {
pub async fn router_chat_get_upstream_model(
router_service: Arc<RouterService>,
client_request: ProviderRequestType,
trace_collector: Arc<TraceCollector>,
traceparent: &str,
request_path: &str,
request_id: &str,
@ -120,8 +116,8 @@ pub async fn router_chat_get_upstream_model(
);
// Capture start time for routing span
let routing_start_time = std::time::Instant::now();
let routing_start_system_time = std::time::SystemTime::now();
let _routing_start_time = std::time::Instant::now();
let _routing_start_system_time = std::time::SystemTime::now();
// Attempt to determine route using the router service
let routing_result = router_service
@ -135,21 +131,7 @@ pub async fn router_chat_get_upstream_model(
match routing_result {
Ok(route) => match route {
Some((_, model_name)) => {
// Record successful routing span
let mut attrs: HashMap<String, String> = HashMap::new();
attrs.insert("route.selected_model".to_string(), model_name.clone());
record_routing_span(
trace_collector,
traceparent,
routing_start_time,
routing_start_system_time,
attrs,
)
.await;
Ok(RoutingResult { model_name })
}
Some((_, model_name)) => Ok(RoutingResult { model_name }),
None => {
// No route determined, use default model from request
info!(
@ -158,91 +140,14 @@ pub async fn router_chat_get_upstream_model(
chat_request.model
);
let default_model = chat_request.model.clone();
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), default_model.clone());
record_routing_span(
trace_collector,
traceparent,
routing_start_time,
routing_start_system_time,
attrs,
)
.await;
Ok(RoutingResult {
model_name: default_model,
model_name: chat_request.model.clone(),
})
}
},
Err(err) => {
// Record failed routing span
let mut attrs = HashMap::new();
attrs.insert("route.selected_model".to_string(), "unknown".to_string());
attrs.insert("error.message".to_string(), err.to_string());
record_routing_span(
trace_collector,
traceparent,
routing_start_time,
routing_start_system_time,
attrs,
)
.await;
Err(RoutingError::internal_error(format!(
"Failed to determine route: {}",
err
)))
}
Err(err) => Err(RoutingError::internal_error(format!(
"Failed to determine route: {}",
err
))),
}
}
/// Helper function to record a routing span with the given attributes.
/// Reduces code duplication across different routing outcomes.
async fn record_routing_span(
trace_collector: Arc<TraceCollector>,
traceparent: &str,
start_time: std::time::Instant,
start_system_time: std::time::SystemTime,
attrs: HashMap<String, String>,
) {
// The routing always uses OpenAI Chat Completions format internally,
// so we log that as the actual API being used for routing
let routing_api_path = "/v1/chat/completions";
let routing_operation_name = OperationNameBuilder::new()
.with_method("POST")
.with_path(routing_api_path)
.with_target("Arch-Router-1.5B")
.build();
let (trace_id, parent_span_id) = parse_traceparent(traceparent);
// Build the routing span directly using constants
let mut span_builder = SpanBuilder::new(&routing_operation_name)
.with_trace_id(&trace_id)
.with_kind(SpanKind::Client)
.with_start_time(start_system_time)
.with_end_time(std::time::SystemTime::now())
.with_attribute(http::METHOD, "POST")
.with_attribute(http::TARGET, routing_api_path.to_string())
.with_attribute(
routing::ROUTE_DETERMINATION_MS,
start_time.elapsed().as_millis().to_string(),
);
// Only set parent span ID if it exists (not a root span)
if let Some(parent) = parent_span_id {
span_builder = span_builder.with_parent_span_id(&parent);
}
// Add all custom attributes
for (key, value) in attrs {
span_builder = span_builder.with_attribute(key, value);
}
let span = span_builder.build();
// Record the span directly to the collector
trace_collector.record_span(operation_component::ROUTING, span);
}

View file

@ -1,18 +1,14 @@
use bytes::Bytes;
use common::traces::{Attribute, AttributeValue, Event, Span, TraceCollector};
use http_body_util::combinators::BoxBody;
use http_body_util::StreamBody;
use hyper::body::Frame;
use std::sync::Arc;
use std::time::{Instant, SystemTime};
use std::time::Instant;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::warn;
use tracing::{info, warn};
// Import tracing constants and signals
use crate::signals::{InteractionQuality, SignalAnalyzer, TextBasedSignalAnalyzer, FLAG_MARKER};
use crate::tracing::{error, llm, signals as signal_constants};
use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer};
use hermesllm::apis::openai::Message;
/// Trait for processing streaming chunks
@ -31,11 +27,9 @@ pub trait StreamProcessor: Send + 'static {
fn on_error(&mut self, _error: &str) {}
}
/// A processor that tracks streaming metrics and finalizes the span
/// A processor that tracks streaming metrics
pub struct ObservableStreamProcessor {
collector: Arc<TraceCollector>,
service_name: String,
span: Span,
total_bytes: usize,
chunk_count: usize,
start_time: Instant,
@ -47,22 +41,16 @@ impl ObservableStreamProcessor {
/// Create a new passthrough processor
///
/// # Arguments
/// * `collector` - The trace collector to record the span to
/// * `service_name` - The service name for this span (e.g., "archgw(llm)")
/// * `span` - The span to finalize after streaming completes
/// * `start_time` - When the request started (for duration calculation)
/// * `messages` - Optional conversation messages for signal analysis
pub fn new(
collector: Arc<TraceCollector>,
service_name: impl Into<String>,
span: Span,
start_time: Instant,
messages: Option<Vec<Message>>,
) -> Self {
Self {
collector,
service_name: service_name.into(),
span,
total_bytes: 0,
chunk_count: 0,
start_time,
@ -87,186 +75,30 @@ impl StreamProcessor for ObservableStreamProcessor {
}
fn on_complete(&mut self) {
// Update span with streaming metrics and end time
let end_time_nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
self.span.end_time_unix_nano = format!("{}", end_time_nanos);
// Add streaming metrics as attributes using constants
self.span.attributes.push(Attribute {
key: llm::RESPONSE_BYTES.to_string(),
value: AttributeValue {
string_value: Some(self.total_bytes.to_string()),
},
});
self.span.attributes.push(Attribute {
key: llm::DURATION_MS.to_string(),
value: AttributeValue {
string_value: Some(self.start_time.elapsed().as_millis().to_string()),
},
});
// Add time to first token if available (streaming only)
if let Some(ttft) = self.time_to_first_token {
self.span.attributes.push(Attribute {
key: llm::TIME_TO_FIRST_TOKEN_MS.to_string(),
value: AttributeValue {
string_value: Some(ttft.to_string()),
},
});
// Add time to first token as a span event
// Calculate the timestamp by adding ttft duration to span start time
if let Ok(start_time_nanos) = self.span.start_time_unix_nano.parse::<u128>() {
// Convert ttft from milliseconds to nanoseconds and add to start time
let event_timestamp = start_time_nanos + (ttft * 1_000_000);
let mut event =
Event::new(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), event_timestamp);
event.add_attribute(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), ttft.to_string());
// Initialize events vector if needed
if self.span.events.is_none() {
self.span.events = Some(Vec::new());
}
if let Some(ref mut events) = self.span.events {
events.push(event);
}
}
}
// Analyze signals if messages are available and add to span attributes
// Analyze signals if messages are available
if let Some(ref messages) = self.messages {
let analyzer: Box<dyn SignalAnalyzer> = Box::new(TextBasedSignalAnalyzer::new());
let report = analyzer.analyze(messages);
// Add overall quality
self.span.attributes.push(Attribute {
key: signal_constants::QUALITY.to_string(),
value: AttributeValue {
string_value: Some(format!("{:?}", report.overall_quality)),
},
});
// Add repair/follow-up metrics if concerning
if report.follow_up.is_concerning || report.follow_up.repair_count > 0 {
self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.follow_up.repair_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: signal_constants::REPAIR_RATIO.to_string(),
value: AttributeValue {
string_value: Some(format!("{:.3}", report.follow_up.repair_ratio)),
},
});
}
// Add flag marker to operation name if any concerning signal is detected
let should_flag = report.frustration.has_frustration
|| report.repetition.has_looping
|| report.escalation.escalation_requested
|| matches!(
report.overall_quality,
InteractionQuality::Poor | InteractionQuality::Severe
);
if should_flag {
// Prepend flag marker to the operation name
self.span.name = format!("{} {}", self.span.name, FLAG_MARKER);
}
// Add key signal metrics
if report.frustration.has_frustration {
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.frustration_count.to_string()),
},
});
self.span.attributes.push(Attribute {
key: signal_constants::FRUSTRATION_SEVERITY.to_string(),
value: AttributeValue {
string_value: Some(report.frustration.severity.to_string()),
},
});
}
if report.repetition.has_looping {
self.span.attributes.push(Attribute {
key: signal_constants::REPETITION_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.repetition.repetition_count.to_string()),
},
});
}
if report.escalation.escalation_requested {
self.span.attributes.push(Attribute {
key: signal_constants::ESCALATION_REQUESTED.to_string(),
value: AttributeValue {
string_value: Some("true".to_string()),
},
});
}
if report.positive_feedback.has_positive_feedback {
self.span.attributes.push(Attribute {
key: signal_constants::POSITIVE_FEEDBACK_COUNT.to_string(),
value: AttributeValue {
string_value: Some(report.positive_feedback.positive_count.to_string()),
},
});
}
let _report = analyzer.analyze(messages);
// Signal analysis complete - OpenTelemetry automatic instrumentation handles span attributes
}
// Record the finalized span
self.collector
.record_span(&self.service_name, self.span.clone());
info!(
service = %self.service_name,
total_bytes = self.total_bytes,
chunk_count = self.chunk_count,
duration_ms = self.start_time.elapsed().as_millis(),
time_to_first_token_ms = ?self.time_to_first_token,
"Streaming completed"
);
}
fn on_error(&mut self, error_msg: &str) {
warn!("Stream error in PassthroughProcessor: {}", error_msg);
// Update span with error info and end time
let end_time_nanos = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos();
self.span.end_time_unix_nano = format!("{}", end_time_nanos);
self.span.attributes.push(Attribute {
key: error::ERROR.to_string(),
value: AttributeValue {
string_value: Some("true".to_string()),
},
});
self.span.attributes.push(Attribute {
key: error::MESSAGE.to_string(),
value: AttributeValue {
string_value: Some(error_msg.to_string()),
},
});
self.span.attributes.push(Attribute {
key: llm::DURATION_MS.to_string(),
value: AttributeValue {
string_value: Some(self.start_time.elapsed().as_millis().to_string()),
},
});
// Record the error span
self.collector
.record_span(&self.service_name, self.span.clone());
warn!(
service = %self.service_name,
error = error_msg,
duration_ms = self.start_time.elapsed().as_millis(),
"Stream error"
);
}
}

View file

@ -13,7 +13,6 @@ use common::configuration::{Agent, Configuration};
use common::consts::{
CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH, PLANO_ORCHESTRATOR_MODEL_NAME,
};
use common::traces::TraceCollector;
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use hyper::body::Incoming;
use hyper::server::conn::http1;
@ -112,17 +111,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Initialize trace collector and start background flusher
// Tracing is enabled if the tracing config is present in arch_config.yaml
// Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED
let tracing_enabled = if arch_config.tracing.is_some() {
info!("Tracing configuration found in arch_config.yaml");
Some(true)
} else {
info!(
"No tracing configuration in arch_config.yaml, will check OTEL_TRACING_ENABLED env var"
);
None
};
let trace_collector = Arc::new(TraceCollector::new(tracing_enabled));
let _flusher_handle = trace_collector.clone().start_background_flusher();
// OpenTelemetry automatic instrumentation is configured in utils/tracing.rs
// Initialize conversation state storage for v1/responses
// Configurable via arch_config.yaml state_storage section
@ -171,7 +160,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let agents_list = combined_agents_filters_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let state_storage = state_storage.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -182,7 +170,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let model_aliases = Arc::clone(&model_aliases);
let agents_list = agents_list.clone();
let listeners = listeners.clone();
let trace_collector = trace_collector.clone();
let state_storage = state_storage.clone();
async move {
@ -202,7 +189,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fully_qualified_url,
agents_list,
listeners,
trace_collector,
)
.with_context(parent_cx)
.await;
@ -220,7 +206,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
fully_qualified_url,
model_aliases,
llm_providers,
trace_collector,
state_storage,
)
.with_context(parent_cx)

View file

@ -2,11 +2,12 @@ use std::fmt;
use std::sync::OnceLock;
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
use opentelemetry_stdout::SpanExporter;
use time::macros::format_description;
use tracing::{Event, Subscriber};
use tracing_subscriber::fmt::{format, time::FormatTime, FmtContext, FormatEvent, FormatFields};
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::EnvFilter;
struct BracketedTime;
@ -58,21 +59,63 @@ static INIT_LOGGER: OnceLock<SdkTracerProvider> = OnceLock::new();
pub fn init_tracer() -> &'static SdkTracerProvider {
INIT_LOGGER.get_or_init(|| {
global::set_text_map_propagator(TraceContextPropagator::new());
// Install stdout exporter pipeline to be able to retrieve the collected spans.
// For the demonstration, use `Sampler::AlwaysOn` sampler to sample all traces.
let provider = SdkTracerProvider::builder()
.with_simple_exporter(SpanExporter::default())
.build();
global::set_tracer_provider(provider.clone());
// Get OTEL collector URL from environment
let otel_endpoint = std::env::var("OTEL_COLLECTOR_URL")
.unwrap_or_else(|_| "http://localhost:4317".to_string());
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.event_format(BracketedFormatter)
.init();
let tracing_enabled = std::env::var("OTEL_TRACING_ENABLED")
.ok()
.and_then(|s| s.parse().ok())
.unwrap_or(false);
provider
// Create OTLP exporter to send spans to collector
if tracing_enabled {
// Set service name via environment if not already set
if std::env::var("OTEL_SERVICE_NAME").is_err() {
std::env::set_var("OTEL_SERVICE_NAME", "brightstaff");
}
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&otel_endpoint)
.build()
.expect("Failed to create OTLP span exporter");
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)
.build();
global::set_tracer_provider(provider.clone());
// Create OpenTelemetry tracing layer using TracerProvider trait
use opentelemetry::trace::TracerProvider as _;
let telemetry_layer =
tracing_opentelemetry::layer().with_tracer(provider.tracer("brightstaff"));
// Combine the OpenTelemetry layer with fmt layer using the registry
let subscriber = tracing_subscriber::registry()
.with(telemetry_layer)
.with(EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")))
.with(tracing_subscriber::fmt::layer().event_format(BracketedFormatter));
tracing::subscriber::set_global_default(subscriber)
.expect("Failed to set tracing subscriber");
provider
} else {
// Tracing disabled - use no-op provider
let provider = SdkTracerProvider::builder().build();
global::set_tracer_provider(provider.clone());
tracing_subscriber::fmt()
.with_env_filter(
EnvFilter::try_from_default_env().unwrap_or_else(|_| EnvFilter::new("info")),
)
.event_format(BracketedFormatter)
.init();
provider
}
})
}

View file

@ -16,6 +16,8 @@ services:
context: ../../../
dockerfile: Dockerfile
ports:
- "11000:11000"
- "12001:12001"
- "12000:12000"
- "8001:8001"
environment: