use custom exporter

This commit is contained in:
Adil Hafeez 2026-02-07 12:37:49 -08:00
parent b861f41c03
commit 7eec3bc932
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
11 changed files with 349 additions and 44 deletions

View file

@ -14,6 +14,7 @@
"RUST_LOG": "debug",
"RUST_BACKTRACE": "1",
"ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/multi_agent_with_crewai_langchain/config.yaml_rendered",
// "ARCH_CONFIG_PATH_RENDERED": "../demos/use_cases/preference_based_routing/config.yaml_rendered",
"OTEL_COLLECTOR_URL": "http://localhost:4317",
"OTEL_TRACING_ENABLED": "true"
},

View file

@ -9,6 +9,7 @@ use hermesllm::ProviderRequestType;
use http_body_util::combinators::BoxBody;
use http_body_util::BodyExt;
use hyper::{Request, Response};
use opentelemetry::trace::get_active_span;
use serde::ser::Error as SerError;
use tracing::{debug, info, info_span, warn, Instrument};
@ -16,6 +17,7 @@ use super::agent_selector::{AgentSelectionError, AgentSelector};
use super::pipeline_processor::{PipelineError, PipelineProcessor};
use super::response_handler::ResponseHandler;
use crate::router::plano_orchestrator::OrchestratorService;
use crate::tracing::{operation_component, set_service_name};
/// Main errors for agent chat completions
#[derive(Debug, thiserror::Error)]
@ -52,7 +54,8 @@ pub async fn agent_chat(
// Create a span with request_id that will be included in all log lines
let request_span = info_span!(
"agent_chat_handler",
"(orchestrator)",
component = "orchestrator",
request_id = %request_id,
http.method = %request.method(),
http.path = %request.uri().path()
@ -60,6 +63,9 @@ pub async fn agent_chat(
// Execute the handler inside the span
async {
// Set service name for orchestrator operations
set_service_name(operation_component::ORCHESTRATOR);
match handle_agent_chat_inner(
request,
orchestrator_service,
@ -163,13 +169,17 @@ async fn handle_agent_chat_inner(
.and_then(|name| name.to_str().ok());
// Find the appropriate listener
let listener = {
let listener: common::configuration::Listener = {
let listeners = listeners.read().await;
agent_selector
.find_listener(listener_name, &listeners)
.await?
};
get_active_span(|span| {
span.update_name(format!("(orchestrator) {}", listener.name));
});
info!(listener = %listener.name, "handling request");
// Parse request body

View file

@ -26,7 +26,7 @@ use crate::state::response_state_processor::ResponsesStateProcessor;
use crate::state::{
extract_input_items, retrieve_and_combine_input, StateStorage, StateStorageError,
};
use crate::tracing::operation_component;
use crate::tracing::{operation_component, set_service_name};
fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
Full::new(chunk.into())
@ -55,7 +55,8 @@ pub async fn llm_chat(
// Create a span with request_id that will be included in all log lines
let request_span = info_span!(
"llm_chat_handler",
"llm",
component = "llm",
request_id = %request_id,
http.method = %request.method(),
http.path = %request_path,
@ -92,6 +93,9 @@ async fn llm_chat_inner(
request_path: String,
mut request_headers: hyper::HeaderMap,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
// Set service name for LLM operations
set_service_name(operation_component::LLM);
// Extract or generate traceparent - this establishes the trace context for all spans
let traceparent: String = match request_headers
.get(TRACE_PARENT_HEADER)
@ -148,20 +152,25 @@ async fn llm_chat_inner(
let model_from_request = client_request.model().to_string();
let _temperature = client_request.get_temperature();
let is_streaming_request = client_request.is_streaming();
let resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
let alias_resolved_model = resolve_model_alias(&model_from_request, &model_aliases);
// Record model information in span
tracing::Span::current().record("model.requested", model_from_request.as_str());
tracing::Span::current().record("model.alias_resolved", resolved_model.as_str());
tracing::Span::current().record("model.alias_resolved", alias_resolved_model.as_str());
// Validate that the requested model exists in configuration
// This matches the validation in llm_gateway routing.rs
if llm_providers.read().await.get(&resolved_model).is_none() {
if llm_providers
.read()
.await
.get(&alias_resolved_model)
.is_none()
{
let err_msg = format!(
"Model '{}' not found in configured providers",
resolved_model
alias_resolved_model
);
warn!(model = %resolved_model, "model not found in configured providers");
warn!(model = %alias_resolved_model, "model not found in configured providers");
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
@ -169,10 +178,10 @@ async fn llm_chat_inner(
// Handle provider/model slug format (e.g., "openai/gpt-4")
// Extract just the model name for upstream (providers don't understand the slug)
let model_name_only = if let Some((_, model)) = resolved_model.split_once('/') {
let model_name_only = if let Some((_, model)) = alias_resolved_model.split_once('/') {
model.to_string()
} else {
resolved_model.clone()
alias_resolved_model.clone()
};
// Extract tool names and user message preview for span attributes
@ -207,9 +216,9 @@ async fn llm_chat_inner(
// Get the upstream path and check if it's ResponsesAPI
let upstream_path = get_upstream_path(
&llm_providers,
&resolved_model,
&alias_resolved_model,
&request_path,
&resolved_model,
&alias_resolved_model,
is_streaming_request,
)
.await;
@ -294,31 +303,39 @@ async fn llm_chat_inner(
// Determine final model to use
// Router returns "none" as a sentinel value when it doesn't select a specific model
let router_selected_model = routing_result.model_name;
let model_name = if router_selected_model != "none" {
let resolved_model = if router_selected_model != "none" {
// Router selected a specific model via routing preferences
router_selected_model
} else {
// Router returned "none" sentinel, use validated resolved_model from request
resolved_model.clone()
alias_resolved_model.clone()
};
// Record the routed model in span
tracing::Span::current().record("model.routing_resolved", model_name.as_str());
tracing::Span::current().record("model.routing_resolved", resolved_model.as_str());
get_active_span(|span| {
span.update_name(format!("llm_chat POST {} -> {}", request_path, model_name));
let span_name = if model_from_request == resolved_model {
format!("(llm) {} {}", request_path, resolved_model)
} else {
format!(
"(llm) {} {} -> {}",
request_path, model_from_request, resolved_model
)
};
span.update_name(span_name);
});
debug!(
url = %full_qualified_llm_provider_url,
provider_hint = %model_name,
provider_hint = %resolved_model,
upstream_model = %model_name_only,
"Routing to upstream"
);
request_headers.insert(
ARCH_PROVIDER_HINT_HEADER,
header::HeaderValue::from_str(&model_name).unwrap(),
header::HeaderValue::from_str(&resolved_model).unwrap(),
);
request_headers.insert(
@ -385,8 +402,8 @@ async fn llm_chat_inner(
base_processor,
state_store,
original_input_items,
alias_resolved_model.clone(),
resolved_model.clone(),
model_name.clone(),
is_streaming_request,
false, // Not OpenAI upstream since should_manage_state is true
content_encoding,

View file

@ -9,12 +9,14 @@ use hermesllm::{ProviderRequest, ProviderRequestType};
use hyper::header::HeaderMap;
use opentelemetry::global;
use opentelemetry::propagation::Injector;
use opentelemetry::trace::get_active_span;
use tracing::{debug, info, instrument, warn};
use crate::handlers::jsonrpc::{
JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse, JSON_RPC_VERSION,
MCP_INITIALIZE, MCP_INITIALIZE_NOTIFICATION, TOOL_CALL_METHOD,
};
use crate::tracing::{operation_component, set_service_name};
use uuid::Uuid;
/// Errors that can occur during pipeline processing
@ -91,14 +93,14 @@ impl PipelineProcessor {
}
}
/// Process the filter chain of agents (all except the terminal agent)
#[instrument(
skip(self, chat_history, agent_filter_chain, agent_map, request_headers),
fields(
filter_count = agent_filter_chain.filter_chain.as_ref().map(|fc| fc.len()).unwrap_or(0),
message_count = chat_history.len()
)
)]
// /// Process the filter chain of agents (all except the terminal agent)
// #[instrument(
// skip(self, chat_history, agent_filter_chain, agent_map, request_headers),
// fields(
// filter_count = agent_filter_chain.filter_chain.as_ref().map(|fc| fc.len()).unwrap_or(0),
// message_count = chat_history.len()
// )
// )]
#[allow(clippy::too_many_arguments)]
pub async fn process_filter_chain(
&mut self,
@ -309,6 +311,9 @@ impl PipelineProcessor {
agent: &Agent,
request_headers: &HeaderMap,
) -> Result<Vec<Message>, PipelineError> {
// Set service name for this filter span
set_service_name(operation_component::AGENT_FILTER);
// Update current span name to include filter name
use opentelemetry::trace::get_active_span;
get_active_span(|span| {
@ -524,6 +529,9 @@ impl PipelineProcessor {
agent: &Agent,
request_headers: &HeaderMap,
) -> Result<Vec<Message>, PipelineError> {
// Set service name for this filter span
set_service_name(operation_component::AGENT_FILTER);
// Update current span name to include filter name
use opentelemetry::trace::get_active_span;
get_active_span(|span| {
@ -626,9 +634,18 @@ impl PipelineProcessor {
terminal_agent: &Agent,
request_headers: &HeaderMap,
) -> Result<reqwest::Response, PipelineError> {
// Set service name for agent invocation span
set_service_name(operation_component::AGENT);
// let mut request = original_request.clone();
original_request.set_messages(messages);
let request_url = "/v1/chat/completions";
get_active_span(|span| {
span.update_name(format!("(agent) {} {}", terminal_agent.id, request_url));
});
let request_body = ProviderRequestType::to_bytes(&original_request).unwrap();
// let request_body = serde_json::to_string(&request)?;
debug!("sending request to terminal agent {}", terminal_agent.id);
@ -657,7 +674,7 @@ impl PipelineProcessor {
let response = self
.client
.post(format!("{}/v1/chat/completions", self.url))
.post(format!("{}{}", self.url, request_url))
.headers(agent_headers)
.body(request_body)
.send()

View file

@ -9,6 +9,7 @@ use tokio_stream::StreamExt;
use tracing::{info, warn, Instrument};
use crate::signals::{SignalAnalyzer, TextBasedSignalAnalyzer};
use crate::tracing::set_service_name;
use hermesllm::apis::openai::Message;
/// Trait for processing streaming chunks
@ -41,7 +42,9 @@ impl ObservableStreamProcessor {
/// Create a new passthrough processor
///
/// # Arguments
/// * `service_name` - The service name for this span (e.g., "archgw(llm)")
/// * `service_name` - The service name for this span (e.g., "plano(llm)")
/// This will be set as the `service.name.override` attribute on the current span,
/// allowing the ServiceNameOverrideExporter to route spans to different services.
/// * `start_time` - When the request started (for duration calculation)
/// * `messages` - Optional conversation messages for signal analysis
pub fn new(
@ -49,8 +52,14 @@ impl ObservableStreamProcessor {
start_time: Instant,
messages: Option<Vec<Message>>,
) -> Self {
let service_name = service_name.into();
// Set the service name override on the current span for OpenTelemetry export
// This allows the ServiceNameOverrideExporter to route this span to the correct service
set_service_name(&service_name);
Self {
service_name: service_name.into(),
service_name,
total_bytes: 0,
chunk_count: 0,
start_time,

View file

@ -1,5 +1,54 @@
mod constants;
mod service_name_exporter;
pub use constants::{
error, http, llm, operation_component, routing, signals, OperationNameBuilder,
};
pub use service_name_exporter::{ServiceNameOverrideExporter, SERVICE_NAME_OVERRIDE_KEY};
use opentelemetry::trace::TraceContextExt;
use opentelemetry::KeyValue;
use tracing_opentelemetry::OpenTelemetrySpanExt;
/// Sets the service name override on the current tracing span.
///
/// This function adds the `service.name.override` attribute to the underlying
/// OpenTelemetry span, which allows observability backends to filter and group
/// spans by their logical service (e.g., `plano(llm)`, `plano(filter)`).
///
/// # Arguments
/// * `service_name` - The service name to use (e.g., `operation_component::LLM`)
///
/// # Example
/// ```rust,ignore
/// use brightstaff::tracing::{set_service_name, operation_component};
///
/// // Inside a traced function:
/// set_service_name(operation_component::LLM);
/// ```
pub fn set_service_name(service_name: &str) {
let span = tracing::Span::current();
let otel_context = span.context();
let otel_span = otel_context.span();
otel_span.set_attribute(KeyValue::new(
SERVICE_NAME_OVERRIDE_KEY,
service_name.to_string(),
));
}
/// Sets the service name override on the given tracing span.
///
/// Use this when you have a specific span reference and want to set
/// the service name override attribute on it.
///
/// # Arguments
/// * `span` - The tracing span to set the service name on
/// * `service_name` - The service name to use (e.g., `operation_component::LLM`)
pub fn set_service_name_on_span(span: &tracing::Span, service_name: &str) {
let otel_context = span.context();
let otel_span = otel_context.span();
otel_span.set_attribute(KeyValue::new(
SERVICE_NAME_OVERRIDE_KEY,
service_name.to_string(),
));
}

View file

@ -0,0 +1,167 @@
//! Service Name Override Exporter
//!
//! This module provides a custom SpanExporter that allows per-span service.name overrides.
//! In OpenTelemetry, `service.name` is part of the Resource, which is tied to the TracerProvider.
//! However, if you need different service names for different spans (e.g., `plano(orchestrator)`,
//! `plano(filter)`, `plano(llm)`) within the same provider, this exporter handles that by:
//!
//! 1. Looking for a special span attribute `service.name.override`
//! 2. Grouping spans by their effective service name
//! 3. Exporting each group via a dedicated OTLP exporter whose Resource has the correct
//! `service.name`
//!
//! All per-service exporters are created eagerly at construction time so that no tonic
//! channel creation happens later inside `futures_executor::block_on` (which the
//! `BatchSpanProcessor` uses and which lacks a tokio runtime).
//!
//! # Usage
//!
//! ```rust
//! use brightstaff::tracing::{set_service_name, operation_component};
//!
//! // In your instrumented code, set the service name override:
//! set_service_name(operation_component::LLM);
//! ```
use opentelemetry::Key;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::error::OTelSdkResult;
use opentelemetry_sdk::trace::{SpanData, SpanExporter};
use opentelemetry_sdk::Resource;
use std::collections::HashMap;
use std::time::Duration;
use tokio::sync::Mutex;
use super::operation_component;
/// The attribute key used to override the service name for a specific span.
/// Set this as a span attribute to route the span to a different service.
pub const SERVICE_NAME_OVERRIDE_KEY: &str = "service.name.override";
/// Default service name used when no override is set on a span.
const DEFAULT_SERVICE_NAME: &str = "plano";
/// All known service names that will have dedicated exporters.
const ALL_SERVICE_NAMES: &[&str] = &[
DEFAULT_SERVICE_NAME,
operation_component::INBOUND,
operation_component::ROUTING,
operation_component::ORCHESTRATOR,
operation_component::HANDOFF,
operation_component::AGENT_FILTER,
operation_component::AGENT,
operation_component::LLM,
];
/// A SpanExporter that supports per-span `service.name` overrides.
///
/// Internally it holds one OTLP exporter per known service name. Each exporter
/// has its own `Resource` with the correct `service.name`, so backends like
/// Jaeger see the spans under the right service.
pub struct ServiceNameOverrideExporter {
/// Map from service name → pre-created OTLP exporter (behind tokio Mutex
/// because `SpanExporter::export` takes `&self` and the future must be Send).
exporters: HashMap<String, Mutex<opentelemetry_otlp::SpanExporter>>,
}
// Manual Debug because `opentelemetry_otlp::SpanExporter` doesn't implement Debug
impl std::fmt::Debug for ServiceNameOverrideExporter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ServiceNameOverrideExporter")
.field("services", &self.exporters.keys().collect::<Vec<_>>())
.finish()
}
}
impl ServiceNameOverrideExporter {
/// Create a new `ServiceNameOverrideExporter`.
///
/// This eagerly creates one OTLP gRPC exporter per known service name so
/// that the tonic channel is established while a tokio runtime is available.
///
/// # Arguments
/// * `endpoint` The OTLP collector endpoint URL (e.g. `http://localhost:4317`)
pub fn new(endpoint: &str) -> Self {
let mut exporters = HashMap::new();
for &service_name in ALL_SERVICE_NAMES {
let resource = Resource::builder_empty()
.with_service_name(service_name)
.build();
let mut exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(endpoint)
.build()
.expect("Failed to create OTLP span exporter");
exporter.set_resource(&resource);
exporters.insert(service_name.to_string(), Mutex::new(exporter));
}
Self { exporters }
}
}
impl SpanExporter for ServiceNameOverrideExporter {
fn export(
&self,
batch: Vec<SpanData>,
) -> impl std::future::Future<Output = OTelSdkResult> + Send {
let override_key = Key::new(SERVICE_NAME_OVERRIDE_KEY);
// Group spans by their effective service name
let mut spans_by_service: HashMap<String, Vec<SpanData>> = HashMap::new();
for span in batch {
let service_name = span
.attributes
.iter()
.find(|kv| kv.key == override_key)
.map(|kv| kv.value.to_string())
.unwrap_or_else(|| DEFAULT_SERVICE_NAME.to_string());
spans_by_service.entry(service_name).or_default().push(span);
}
// Collect grouped spans into a Vec so the async block owns the data.
let results: Vec<(String, Vec<SpanData>)> = spans_by_service.into_iter().collect();
async move {
for (service_name, spans) in results {
// Look up the pre-created exporter; fall back to default if
// the service name isn't one of the known ones.
let key = if self.exporters.contains_key(&service_name) {
service_name.clone()
} else {
DEFAULT_SERVICE_NAME.to_string()
};
if let Some(exporter_mutex) = self.exporters.get(&key) {
let exporter = exporter_mutex.lock().await;
if let Err(e) = exporter.export(spans).await {
tracing::warn!(
service = %service_name,
error = ?e,
"Failed to export spans"
);
}
}
}
Ok(())
}
}
fn shutdown_with_timeout(&mut self, timeout: Duration) -> OTelSdkResult {
for (_, exporter_mutex) in self.exporters.iter() {
if let Ok(mut exporter) = exporter_mutex.try_lock() {
let _ = exporter.shutdown_with_timeout(timeout);
}
}
Ok(())
}
fn set_resource(&mut self, _resource: &Resource) {
// Each inner exporter already has its own resource set at creation time.
// Nothing to propagate.
}
}

View file

@ -2,7 +2,6 @@ use std::fmt;
use std::sync::OnceLock;
use opentelemetry::global;
use opentelemetry_otlp::WithExportConfig;
use opentelemetry_sdk::{propagation::TraceContextPropagator, trace::SdkTracerProvider};
use time::macros::format_description;
use tracing::{Event, Subscriber};
@ -12,6 +11,8 @@ use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::util::SubscriberInitExt;
use tracing_subscriber::EnvFilter;
use crate::tracing::ServiceNameOverrideExporter;
struct BracketedTime;
impl FormatTime for BracketedTime {
@ -96,14 +97,13 @@ pub fn init_tracer() -> &'static SdkTracerProvider {
if tracing_enabled {
// Set service name via environment if not already set
if std::env::var("OTEL_SERVICE_NAME").is_err() {
std::env::set_var("OTEL_SERVICE_NAME", "brightstaff");
std::env::set_var("OTEL_SERVICE_NAME", "plano");
}
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint(&otel_endpoint)
.build()
.expect("Failed to create OTLP span exporter");
// Create ServiceNameOverrideExporter to support per-span service names
// This allows spans to have different service names (e.g., plano(orchestrator),
// plano(filter), plano(llm)) by setting the "service.name.override" attribute
let exporter = ServiceNameOverrideExporter::new(&otel_endpoint);
let provider = SdkTracerProvider::builder()
.with_batch_exporter(exporter)

View file

@ -1,5 +1,19 @@
services:
plano:
build:
context: ../../../
dockerfile: Dockerfile
ports:
- "12000:12000"
- "12001:12001"
environment:
- ARCH_CONFIG_PATH=/app/arch_config.yaml
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
volumes:
- ./config.yaml:/app/arch_config.yaml:ro
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
open-web-ui:
image: dyrnq/open-webui:main

View file

@ -38,15 +38,20 @@ services:
- LLM_GATEWAY_ENDPOINT=http://plano:12000/v1
command: ["python", "-u", "langchain/weather_agent.py"]
open-web-ui:
image: dyrnq/open-webui:main
anythingllm:
image: mintplexlabs/anythingllm
restart: always
ports:
- "8080:8080"
- "3001:3001"
cap_add:
- SYS_ADMIN
environment:
- DEFAULT_MODEL=gpt-4o-mini
- ENABLE_OPENAI_API=true
- OPENAI_API_BASE_URL=http://plano:8001/v1
- STORAGE_DIR=/app/server/storage
- LLM_PROVIDER=generic-openai
- GENERIC_OPEN_AI_BASE_PATH=http://plano:8001/v1
- GENERIC_OPEN_AI_MODEL_PREF=gpt-4o-mini
- GENERIC_OPEN_AI_MODEL_TOKEN_LIMIT=128000
- GENERIC_OPEN_AI_API_KEY=sk-placeholder
jaeger:
build:

View file

@ -1,5 +1,21 @@
services:
plano:
build:
context: ../../../
dockerfile: Dockerfile
ports:
- "12000:12000"
- "12001:12001"
environment:
- ARCH_CONFIG_PATH=/app/arch_config.yaml
- OPENAI_API_KEY=${OPENAI_API_KEY:?OPENAI_API_KEY environment variable is required but not set}
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY:?ANTHROPIC_API_KEY environment variable is required but not set}
- OTEL_TRACING_HTTP_ENDPOINT=http://host.docker.internal:4318/v1/traces
volumes:
- ./config.yaml:/app/arch_config.yaml:ro
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
open-web-ui:
image: dyrnq/open-webui:main
restart: always