diff --git a/.github/workflows/e2e_archgw.yml b/.github/workflows/e2e_archgw.yml index 436ffd1d..97e2492c 100644 --- a/.github/workflows/e2e_archgw.yml +++ b/.github/workflows/e2e_archgw.yml @@ -30,7 +30,7 @@ jobs: - name: build arch docker image run: | - cd ../../ && docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.21 -t katanemo/archgw:latest + cd ../../ && docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.22 -t katanemo/archgw:latest - name: start archgw env: diff --git a/.github/workflows/e2e_test_currency_convert.yml b/.github/workflows/e2e_test_currency_convert.yml index 36af1b35..05be3d84 100644 --- a/.github/workflows/e2e_test_currency_convert.yml +++ b/.github/workflows/e2e_test_currency_convert.yml @@ -24,7 +24,7 @@ jobs: - name: build arch docker image run: | - docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.21 + docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.22 - name: install poetry run: | diff --git a/.github/workflows/e2e_test_preference_based_routing.yml b/.github/workflows/e2e_test_preference_based_routing.yml index b888c779..3f921f60 100644 --- a/.github/workflows/e2e_test_preference_based_routing.yml +++ b/.github/workflows/e2e_test_preference_based_routing.yml @@ -24,7 +24,7 @@ jobs: - name: build arch docker image run: | - docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.21 + docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.22 - name: install poetry run: | diff --git a/.github/workflows/rust_tests.yml b/.github/workflows/rust_tests.yml index 9837531d..e75a6b60 100644 --- a/.github/workflows/rust_tests.yml +++ b/.github/workflows/rust_tests.yml @@ -29,3 +29,6 @@ jobs: - name: Run unit tests run: cargo test --lib + + - name: Run trace integration tests + run: cargo test -p common --features trace-collection traces::tests::trace_integration_test diff --git a/.github/workflows/validate_arch_config.yml b/.github/workflows/validate_arch_config.yml index 100d2de4..19b95980 100644 --- a/.github/workflows/validate_arch_config.yml +++ b/.github/workflows/validate_arch_config.yml @@ -24,7 +24,7 @@ jobs: - name: build arch docker image run: | - docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.21 + docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.22 - name: validate arch config run: | diff --git a/README.md b/README.md index 58f50d5d..ed2c118b 100644 --- a/README.md +++ b/README.md @@ -87,7 +87,7 @@ Arch's CLI allows you to manage and interact with the Arch gateway efficiently. ```console $ python3.12 -m venv venv $ source venv/bin/activate # On Windows, use: venv\Scripts\activate -$ pip install archgw==0.3.21 +$ pip install archgw==0.3.22 ``` ### Use Arch as a LLM Router @@ -276,7 +276,7 @@ endpoints: ```sh $ archgw up arch_config.yaml -2024-12-05 16:56:27,979 - cli.main - INFO - Starting archgw cli version: 0.3.21 +2024-12-05 16:56:27,979 - cli.main - INFO - Starting archgw cli version: 0.3.22 2024-12-05 16:56:28,485 - cli.utils - INFO - Schema validation successful! 2024-12-05 16:56:28,485 - cli.main - INFO - Starting arch model server and arch gateway 2024-12-05 16:56:51,647 - cli.core - INFO - Container is healthy! diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index b1c4f487..3d618faf 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -51,11 +51,11 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: archgw(inbound) + service_name: plano(inbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} - stat_prefix: ingress_traffic + stat_prefix: plano(inbound) codec_type: AUTO scheme_header_transformation: scheme_to_overwrite: https @@ -95,21 +95,6 @@ static_resources: - name: envoy.filters.network.http_connection_manager typed_config: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - generate_request_id: true - tracing: - provider: - name: envoy.tracers.opentelemetry - typed_config: - "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig - grpc_service: - envoy_grpc: - cluster_name: opentelemetry_collector - timeout: 0.250s - service_name: ingress_traffic - random_sampling: - value: {{ arch_tracing.random_sampling }} - {% endif %} stat_prefix: ingress_traffic codec_type: AUTO scheme_header_transformation: @@ -221,7 +206,7 @@ static_resources: - name: outbound_api_traffic address: socket_address: - address: 0.0.0.0 + address: 127.0.0.1 port_value: 11000 traffic_direction: OUTBOUND filter_chains: @@ -240,7 +225,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: outbound_api_traffic + service_name: tools random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} @@ -413,7 +398,7 @@ static_resources: envoy_grpc: cluster_name: opentelemetry_collector timeout: 0.250s - service_name: archgw(outbound) + service_name: plano(outbound) random_sampling: value: {{ arch_tracing.random_sampling }} {% endif %} @@ -484,6 +469,50 @@ static_resources: typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router +{% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} + - name: otel_collector_proxy + address: + socket_address: + address: 127.0.0.1 + port_value: 9903 + traffic_direction: OUTBOUND + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + stat_prefix: otel_proxy + codec_type: AUTO + access_log: + - name: envoy.access_loggers.file + typed_config: + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "/var/log/access_otel.log" + format: | + [%START_TIME%] "%REQ(:METHOD)% %REQ(X-ENVOY-ORIGINAL-PATH?:PATH)% %PROTOCOL%" %RESPONSE_CODE% %RESPONSE_FLAGS% %BYTES_RECEIVED% %BYTES_SENT% %DURATION% %RESP(X-ENVOY-UPSTREAM-SERVICE-TIME)% "%REQ(X-FORWARDED-FOR)%" "%REQ(USER-AGENT)%" "%REQ(X-REQUEST-ID)%" "%REQ(:AUTHORITY)%" "%UPSTREAM_HOST%" "%UPSTREAM_CLUSTER%" + route_config: + name: otel_route + virtual_hosts: + - name: otel_backend + domains: ["*"] + routes: + - match: + prefix: "/v1/traces" + route: + cluster: opentelemetry_collector_http + timeout: 5s + retry_policy: + retry_on: "5xx,connect-failure,refused-stream,reset" + num_retries: 3 + per_try_timeout: 2s + host_selection_retry_max_attempts: 5 + retriable_status_codes: [500, 502, 503, 504] + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router +{% endif %} + - name: egress_traffic_llm address: socket_address: @@ -1014,7 +1043,6 @@ static_resources: port_value: 12001 hostname: arch_listener_llm - {% if "random_sampling" in arch_tracing and arch_tracing["random_sampling"] > 0 %} - name: opentelemetry_collector type: STRICT_DNS @@ -1048,4 +1076,19 @@ static_resources: socket_address: address: host.docker.internal port_value: 4318 + # Circuit breaker configuration to prevent overwhelming OTEL collector + circuit_breakers: + thresholds: + - priority: DEFAULT + max_connections: 100 + max_pending_requests: 100 + max_requests: 100 + max_retries: 3 + # Health checking and outlier detection + outlier_detection: + consecutive_5xx: 5 + interval: 10s + base_ejection_time: 30s + max_ejection_percent: 50 + enforcing_consecutive_5xx: 100 {% endif %} diff --git a/arch/tools/README.md b/arch/tools/README.md index 2503c8ae..c86934a8 100644 --- a/arch/tools/README.md +++ b/arch/tools/README.md @@ -19,7 +19,7 @@ source venv/bin/activate ### Step 3: Run the build script ```bash -pip install archgw==0.3.21 +pip install archgw==0.3.22 ``` ## Uninstall Instructions: archgw CLI diff --git a/arch/tools/cli/consts.py b/arch/tools/cli/consts.py index 40e4a705..b768e037 100644 --- a/arch/tools/cli/consts.py +++ b/arch/tools/cli/consts.py @@ -2,4 +2,4 @@ import os SERVICE_NAME_ARCHGW = "archgw" ARCHGW_DOCKER_NAME = "archgw" -ARCHGW_DOCKER_IMAGE = os.getenv("ARCHGW_DOCKER_IMAGE", "katanemo/archgw:0.3.21") +ARCHGW_DOCKER_IMAGE = os.getenv("ARCHGW_DOCKER_IMAGE", "katanemo/archgw:0.3.22") diff --git a/arch/tools/pyproject.toml b/arch/tools/pyproject.toml index c5b57a87..7bff20ba 100644 --- a/arch/tools/pyproject.toml +++ b/arch/tools/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "archgw" -version = "0.3.21" +version = "0.3.22" description = "Python-based CLI tool to manage Arch Gateway." authors = ["Katanemo Labs, Inc."] readme = "README.md" diff --git a/archgw.code-workspace b/archgw.code-workspace index f94e67b0..bd24f82a 100644 --- a/archgw.code-workspace +++ b/archgw.code-workspace @@ -34,6 +34,7 @@ "editor.defaultFormatter": "ms-python.black-formatter", "editor.formatOnSave": true }, + "rust-analyzer.cargo.features": ["trace-collection"] }, "extensions": { "recommendations": [ diff --git a/build_filter_image.sh b/build_filter_image.sh index 4e6e885e..7f1d7b31 100644 --- a/build_filter_image.sh +++ b/build_filter_image.sh @@ -1 +1 @@ -docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.21 +docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.22 diff --git a/crates/Cargo.lock b/crates/Cargo.lock index b243c365..09c86861 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -167,6 +167,61 @@ dependencies = [ "time", ] +[[package]] +name = "axum" +version = "0.7.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edca88bc138befd0323b20752846e6587272d3b03b0343c8ea28a6f819e6e71f" +dependencies = [ + "async-trait", + "axum-core", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "hyper 1.6.0", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "rustversion", + "serde", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower 0.5.2", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09f2bd6146b97ae3359fa0cc6d6b376d9539582c7b4220f041a33ec24c226199" +dependencies = [ + "async-trait", + "bytes", + "futures-util", + "http 1.3.1", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "rustversion", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backoff" version = "0.4.0" @@ -370,6 +425,7 @@ dependencies = [ name = "common" version = "0.1.0" dependencies = [ + "axum", "derivative", "duration-string", "governor", @@ -379,12 +435,16 @@ dependencies = [ "pretty_assertions", "proxy-wasm", "rand 0.8.5", + "reqwest", "serde", "serde_json", "serde_with", "serde_yaml", + "serial_test", "thiserror 1.0.69", "tiktoken-rs", + "tokio", + "tracing", "url", "urlencoding", ] @@ -1426,6 +1486,12 @@ dependencies = [ "regex-automata 0.1.10", ] +[[package]] +name = "matchit" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e7465ac9959cc2b1404e8e2367b43684a6d13790fe23056cc8c6c5a6b7bcb94" + [[package]] name = "md5" version = "0.7.0" @@ -2458,6 +2524,16 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59fab13f937fa393d08645bf3a84bdfe86e296747b506ada67bb15f10f218b2a" +dependencies = [ + "itoa", + "serde", +] + [[package]] name = "serde_urlencoded" version = "0.7.1" @@ -2981,6 +3057,7 @@ dependencies = [ "tokio", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -3019,6 +3096,7 @@ version = "0.1.41" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "784e0ac535deb450455cbfa28a6f0df145ea1bb7ae51b821cf5e7927fdcfbdd0" dependencies = [ + "log", "pin-project-lite", "tracing-attributes", "tracing-core", diff --git a/crates/Cargo.toml b/crates/Cargo.toml index 5cd6b29c..c22e252e 100644 --- a/crates/Cargo.toml +++ b/crates/Cargo.toml @@ -1,3 +1,7 @@ [workspace] resolver = "2" members = ["llm_gateway", "prompt_gateway", "common", "brightstaff", "hermesllm"] + +[workspace.metadata.rust-analyzer] +# Enable features for better IDE support +cargo.features = ["trace-collection"] diff --git a/crates/brightstaff/Cargo.toml b/crates/brightstaff/Cargo.toml index 3dfd1abe..2d88e213 100644 --- a/crates/brightstaff/Cargo.toml +++ b/crates/brightstaff/Cargo.toml @@ -7,7 +7,7 @@ edition = "2021" async-openai = "0.30.1" bytes = "1.10.1" chrono = "0.4" -common = { version = "0.1.0", path = "../common" } +common = { version = "0.1.0", path = "../common", features = ["trace-collection"] } eventsource-client = "0.15.0" eventsource-stream = "0.2.3" futures = "0.3.31" diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 65ce5ef8..a47a4435 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -37,7 +37,38 @@ pub async fn agent_chat( match handle_agent_chat(request, router_service, agents_list, listeners).await { Ok(response) => Ok(response), Err(err) => { - // Print detailed error information with full error chain + // Check if this is a client error from the pipeline that should be cascaded + if let AgentFilterChainError::Pipeline(PipelineError::ClientError { + agent, + status, + body, + }) = &err + { + warn!( + "Client error from agent '{}' (HTTP {}): {}", + agent, status, body + ); + + // Create error response with the original status code and body + let error_json = serde_json::json!({ + "error": "ClientError", + "agent": agent, + "status": status, + "agent_response": body + }); + + let json_string = error_json.to_string(); + let mut response = Response::new(ResponseHandler::create_full_body(json_string)); + *response.status_mut() = hyper::StatusCode::from_u16(*status) + .unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR); + response.headers_mut().insert( + hyper::header::CONTENT_TYPE, + "application/json".parse().unwrap(), + ); + return Ok(response); + } + + // Print detailed error information with full error chain for other errors let mut error_chain = Vec::new(); let mut current_error: &dyn std::error::Error = &err; diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs new file mode 100644 index 00000000..b3686fae --- /dev/null +++ b/crates/brightstaff/src/handlers/llm.rs @@ -0,0 +1,345 @@ +use bytes::Bytes; +use common::configuration::{LlmProvider, ModelAlias}; +use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; +use common::traces::TraceCollector; +use hermesllm::clients::SupportedAPIsFromClient; +use hermesllm::{ProviderRequest, ProviderRequestType}; +use http_body_util::combinators::BoxBody; +use http_body_util::{BodyExt, Full}; +use hyper::header::{self}; +use hyper::{Request, Response, StatusCode}; +use std::collections::HashMap; +use std::sync::Arc; +use tokio::sync::RwLock; +use tracing::{debug, warn}; + +use crate::router::llm_router::RouterService; +use crate::handlers::utils::{create_streaming_response, ObservableStreamProcessor, truncate_message}; +use crate::handlers::router_chat::router_chat_get_upstream_model; +use crate::tracing::operation_component; + +fn full>(chunk: T) -> BoxBody { + Full::new(chunk.into()) + .map_err(|never| match never {}) + .boxed() +} + +pub async fn llm_chat( + request: Request, + router_service: Arc, + full_qualified_llm_provider_url: String, + model_aliases: Arc>>, + llm_providers: Arc>>, + trace_collector: Arc, +) -> Result>, hyper::Error> { + + let request_path = request.uri().path().to_string(); + let request_headers = request.headers().clone(); + + // Extract or generate traceparent - this establishes the trace context for all spans + let traceparent: String = request_headers + .get("traceparent") + .and_then(|h| h.to_str().ok()) + .map(|s| s.to_string()) + .unwrap_or_else(|| { + use uuid::Uuid; + let trace_id = Uuid::new_v4().to_string().replace("-", ""); + format!("00-{}-0000000000000000-01", trace_id) + }); + + let mut request_headers = request_headers; + let chat_request_bytes = request.collect().await?.to_bytes(); + + debug!( + "Received request body (raw utf8): {}", + String::from_utf8_lossy(&chat_request_bytes) + ); + + let mut client_request = match ProviderRequestType::try_from(( + &chat_request_bytes[..], + &SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(), + )) { + Ok(request) => request, + Err(err) => { + warn!("Failed to parse request as ProviderRequestType: {}", err); + let err_msg = format!("Failed to parse request: {}", err); + let mut bad_request = Response::new(full(err_msg)); + *bad_request.status_mut() = StatusCode::BAD_REQUEST; + return Ok(bad_request); + } + }; + + // Model alias resolution: update model field in client_request immediately + // This ensures all downstream objects use the resolved model + let model_from_request = client_request.model().to_string(); + let temperature = client_request.get_temperature(); + let is_streaming_request = client_request.is_streaming(); + let resolved_model = resolve_model_alias(&model_from_request, &model_aliases); + + // Extract tool names and user message preview for span attributes + let tool_names = client_request.get_tool_names(); + let user_message_preview = client_request.get_recent_user_message() + .map(|msg| truncate_message(&msg, 50)); + + client_request.set_model(resolved_model.clone()); + if client_request.remove_metadata_key("archgw_preference_config") { + debug!("Removed archgw_preference_config from metadata"); + } + + let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap(); + + // Determine routing using the dedicated router_chat module + let routing_result = match router_chat_get_upstream_model( + router_service, + client_request, // Pass the original request - router_chat will convert it + &request_headers, + trace_collector.clone(), + &traceparent, + &request_path, + ) + .await + { + Ok(result) => result, + Err(err) => { + let mut internal_error = Response::new(full(err.message)); + *internal_error.status_mut() = err.status_code; + return Ok(internal_error); + } + }; + + let model_name = routing_result.model_name; + + debug!( + "[ARCH_ROUTER] URL: {}, Resolved Model: {}", + full_qualified_llm_provider_url, model_name + ); + + request_headers.insert( + ARCH_PROVIDER_HINT_HEADER, + header::HeaderValue::from_str(&model_name).unwrap(), + ); + + request_headers.insert( + header::HeaderName::from_static(ARCH_IS_STREAMING_HEADER), + header::HeaderValue::from_str(&is_streaming_request.to_string()).unwrap(), + ); + // remove content-length header if it exists + request_headers.remove(header::CONTENT_LENGTH); + + // Capture start time right before sending request to upstream + let request_start_time = std::time::Instant::now(); + let request_start_system_time = std::time::SystemTime::now(); + + let llm_response = match reqwest::Client::new() + .post(full_qualified_llm_provider_url) + .headers(request_headers) + .body(client_request_bytes_for_upstream) + .send() + .await + { + Ok(res) => res, + Err(err) => { + let err_msg = format!("Failed to send request: {}", err); + let mut internal_error = Response::new(full(err_msg)); + *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + return Ok(internal_error); + } + }; + + // copy over the headers and status code from the original response + let response_headers = llm_response.headers().clone(); + let upstream_status = llm_response.status(); + let mut response = Response::builder().status(upstream_status); + let headers = response.headers_mut().unwrap(); + for (header_name, header_value) in response_headers.iter() { + headers.insert(header_name, header_value.clone()); + } + + // Build LLM span with actual status code using constants + let byte_stream = llm_response.bytes_stream(); + + // Build the LLM span (will be finalized after streaming completes) + let llm_span = build_llm_span( + &traceparent, + &request_path, + &resolved_model, + &model_name, + upstream_status.as_u16(), + is_streaming_request, + request_start_system_time, + tool_names, + user_message_preview, + temperature, + &llm_providers, + ).await; + + // Use PassthroughProcessor to track streaming metrics and finalize the span + let processor = ObservableStreamProcessor::new( + trace_collector, + operation_component::LLM, + llm_span, + request_start_time, + ); + + let streaming_response = create_streaming_response(byte_stream, processor, 16); + + match response.body(streaming_response.body) { + Ok(response) => Ok(response), + Err(err) => { + let err_msg = format!("Failed to create response: {}", err); + let mut internal_error = Response::new(full(err_msg)); + *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; + Ok(internal_error) + } + } +} + +/// Resolves model aliases by looking up the requested model in the model_aliases map. +/// Returns the target model if an alias is found, otherwise returns the original model. +fn resolve_model_alias( + model_from_request: &str, + model_aliases: &Arc>>, +) -> String { + if let Some(aliases) = model_aliases.as_ref() { + if let Some(model_alias) = aliases.get(model_from_request) { + debug!( + "Model Alias: 'From {}' -> 'To {}'", + model_from_request, model_alias.target + ); + return model_alias.target.clone(); + } + } + model_from_request.to_string() +} + +/// Builds the LLM span with all required and optional attributes. +async fn build_llm_span( + traceparent: &str, + request_path: &str, + resolved_model: &str, + model_name: &str, + status_code: u16, + is_streaming: bool, + start_time: std::time::SystemTime, + tool_names: Option>, + user_message_preview: Option, + temperature: Option, + llm_providers: &Arc>>, +) -> common::traces::Span { + use common::traces::{SpanBuilder, SpanKind, parse_traceparent}; + use crate::tracing::{http, llm, OperationNameBuilder}; + + // Calculate the upstream path based on provider configuration + let upstream_path = get_upstream_path( + llm_providers, + model_name, + request_path, + resolved_model, + is_streaming, + ).await; + + // Build operation name showing path transformation if different + let operation_name = if request_path != upstream_path { + OperationNameBuilder::new() + .with_method("POST") + .with_path(&format!("{} >> {}", request_path, upstream_path)) + .with_target(resolved_model) + .build() + } else { + OperationNameBuilder::new() + .with_method("POST") + .with_path(request_path) + .with_target(resolved_model) + .build() + }; + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + let mut span_builder = SpanBuilder::new(&operation_name) + .with_trace_id(&trace_id) + .with_kind(SpanKind::Client) + .with_start_time(start_time) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::STATUS_CODE, status_code.to_string()) + .with_attribute(http::TARGET, request_path.to_string()) + .with_attribute(http::UPSTREAM_TARGET, upstream_path) + .with_attribute(llm::MODEL_NAME, resolved_model.to_string()) + .with_attribute(llm::IS_STREAMING, is_streaming.to_string()); + + // Only set parent span ID if it exists (not a root span) + if let Some(parent) = parent_span_id { + span_builder = span_builder.with_parent_span_id(&parent); + } + + // Add optional attributes + if let Some(temp) = temperature { + span_builder = span_builder.with_attribute(llm::TEMPERATURE, temp.to_string()); + } + + if let Some(tools) = tool_names { + let formatted_tools = tools.iter() + .map(|name| format!("{}(...)", name)) + .collect::>() + .join("\n"); + span_builder = span_builder.with_attribute(llm::TOOLS, formatted_tools); + } + + if let Some(preview) = user_message_preview { + span_builder = span_builder.with_attribute(llm::USER_MESSAGE_PREVIEW, preview); + } + + span_builder.build() +} + +/// Calculates the upstream path for the provider based on the model name. +/// Looks up provider configuration, gets the ProviderId and base_url_path_prefix, +/// then uses target_endpoint_for_provider to calculate the correct upstream path. +async fn get_upstream_path( + llm_providers: &Arc>>, + model_name: &str, + request_path: &str, + resolved_model: &str, + is_streaming: bool, +) -> String { + let providers_lock = llm_providers.read().await; + + // First, try to find by model name or provider name + let provider = providers_lock.iter().find(|p| { + p.model.as_ref().map(|m| m == model_name).unwrap_or(false) + || p.name == model_name + }); + + let (provider_id, base_url_path_prefix) = if let Some(provider) = provider { + let provider_id = provider.provider_interface.to_provider_id(); + let prefix = provider.base_url_path_prefix.clone(); + (provider_id, prefix) + } else { + let default_provider = providers_lock.iter().find(|p| { + p.default.unwrap_or(false) + }); + + if let Some(provider) = default_provider { + let provider_id = provider.provider_interface.to_provider_id(); + let prefix = provider.base_url_path_prefix.clone(); + (provider_id, prefix) + } else { + // Last resort: use OpenAI as hardcoded fallback + warn!("No default provider found, falling back to OpenAI"); + (hermesllm::ProviderId::OpenAI, None) + } + }; + + drop(providers_lock); + + // Calculate the upstream path using the proper API + let client_api = SupportedAPIsFromClient::from_endpoint(request_path) + .expect("Should have valid API endpoint"); + + client_api.target_endpoint_for_provider( + &provider_id, + request_path, + resolved_model, + is_streaming, + base_url_path_prefix.as_deref(), + ) +} diff --git a/crates/brightstaff/src/handlers/mod.rs b/crates/brightstaff/src/handlers/mod.rs index 881750fa..e63958d5 100644 --- a/crates/brightstaff/src/handlers/mod.rs +++ b/crates/brightstaff/src/handlers/mod.rs @@ -1,6 +1,7 @@ pub mod agent_chat_completions; pub mod agent_selector; -pub mod router; +pub mod llm; +pub mod router_chat; pub mod models; pub mod function_calling; pub mod pipeline_processor; diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index f01616c9..0bf38da1 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -22,6 +22,18 @@ pub enum PipelineError { NoChoicesInResponse(String), #[error("No content in response from agent '{0}'")] NoContentInResponse(String), + #[error("Client error from agent '{agent}' (HTTP {status}): {body}")] + ClientError { + agent: String, + status: u16, + body: String, + }, + #[error("Server error from agent '{agent}' (HTTP {status}): {body}")] + ServerError { + agent: String, + status: u16, + body: String, + }, } /// Service for processing agent pipelines @@ -182,55 +194,31 @@ impl PipelineProcessor { .send() .await?; + let status = response.status(); let response_bytes = response.bytes().await?; - info!( - "response bytes in str: {}", - String::from_utf8_lossy(&response_bytes) - ); + // Check for HTTP errors and handle them appropriately + if !status.is_success() { + let error_body = String::from_utf8_lossy(&response_bytes).to_string(); - let response_str = String::from_utf8_lossy(&response_bytes); - let lines: Vec<&str> = response_str.lines().collect(); - - // Validate SSE format: first line should be "event: message" - if lines.is_empty() || lines[0] != "event: message" { - warn!("Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}", agent.id, lines.first()); - return Err(PipelineError::NoContentInResponse(format!( - "Invalid SSE response format from agent {}: expected 'event: message' as first line", - agent.id - ))); + if status.is_client_error() { + // 4xx errors - cascade back to developer + return Err(PipelineError::ClientError { + agent: agent.id.clone(), + status: status.as_u16(), + body: error_body, + }); + } else if status.is_server_error() { + // 5xx errors - server/agent error + return Err(PipelineError::ServerError { + agent: agent.id.clone(), + status: status.as_u16(), + body: error_body, + }); + } } - // Find the data line - let data_lines: Vec<&str> = lines - .iter() - .filter(|line| line.starts_with("data: ")) - .copied() - .collect(); - if data_lines.len() != 1 { - warn!( - "Expected exactly one 'data:' line from agent {}, found {}", - agent.id, - data_lines.len() - ); - return Err(PipelineError::NoContentInResponse(format!( - "Expected exactly one 'data:' line from agent {}, found {}", - agent.id, - data_lines.len() - ))); - } - - let data_chunk = &data_lines[0][6..]; // Skip "data: " prefix - - let response: JsonRpcResponse = serde_json::from_str(data_chunk)?; - let response_result = response - .result - .ok_or_else(|| PipelineError::NoChoicesInResponse(agent.id.clone()))?; - - let response_json = response_result - .get("structuredContent") - .ok_or_else(|| PipelineError::NoChoicesInResponse(agent.id.clone()))?; // Parse the response as JSON to extract the content // let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?; diff --git a/crates/brightstaff/src/handlers/router.rs b/crates/brightstaff/src/handlers/router.rs deleted file mode 100644 index c369729a..00000000 --- a/crates/brightstaff/src/handlers/router.rs +++ /dev/null @@ -1,252 +0,0 @@ -use bytes::Bytes; -use common::configuration::{ModelAlias, ModelUsagePreference}; -use common::consts::{ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER}; -use hermesllm::apis::openai::ChatCompletionsRequest; -use hermesllm::clients::endpoints::SupportedUpstreamAPIs; -use hermesllm::clients::SupportedAPIsFromClient; -use hermesllm::{ProviderRequest, ProviderRequestType}; -use http_body_util::combinators::BoxBody; -use http_body_util::{BodyExt, Full}; -use hyper::header::{self}; -use hyper::{Request, Response, StatusCode}; -use std::collections::HashMap; -use std::sync::Arc; -use tracing::{debug, info, warn}; - -use crate::router::llm_router::RouterService; -use crate::handlers::utils::{create_streaming_response, PassthroughProcessor}; - -fn full>(chunk: T) -> BoxBody { - Full::new(chunk.into()) - .map_err(|never| match never {}) - .boxed() -} - -pub async fn router_chat( - request: Request, - router_service: Arc, - full_qualified_llm_provider_url: String, - model_aliases: Arc>>, -) -> Result>, hyper::Error> { - let request_path = request.uri().path().to_string(); - let mut request_headers = request.headers().clone(); - let chat_request_bytes = request.collect().await?.to_bytes(); - - debug!( - "Received request body (raw utf8): {}", - String::from_utf8_lossy(&chat_request_bytes) - ); - - let mut client_request = match ProviderRequestType::try_from(( - &chat_request_bytes[..], - &SupportedAPIsFromClient::from_endpoint(request_path.as_str()).unwrap(), - )) { - Ok(request) => request, - Err(err) => { - warn!("Failed to parse request as ProviderRequestType: {}", err); - let err_msg = format!("Failed to parse request: {}", err); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - }; - - // Model alias resolution: update model field in client_request immediately - // This ensures all downstream objects use the resolved model - let model_from_request = client_request.model().to_string(); - let is_streaming_request = client_request.is_streaming(); - let resolved_model = if let Some(model_aliases) = model_aliases.as_ref() { - if let Some(model_alias) = model_aliases.get(&model_from_request) { - debug!( - "Model Alias: 'From {}' -> 'To {}'", - model_from_request, model_alias.target - ); - model_alias.target.clone() - } else { - model_from_request.clone() - } - } else { - model_from_request.clone() - }; - client_request.set_model(resolved_model.clone()); - - // Clone metadata for routing and remove archgw_preference_config from original - let routing_metadata = client_request.metadata().clone(); - - if client_request.remove_metadata_key("archgw_preference_config") { - debug!("Removed archgw_preference_config from metadata"); - } - - let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap(); - - // Convert to ChatCompletionsRequest regardless of input type (clone to avoid moving original) - let chat_completions_request_for_arch_router: ChatCompletionsRequest = - match ProviderRequestType::try_from(( - client_request, - &SupportedUpstreamAPIs::OpenAIChatCompletions( - hermesllm::apis::OpenAIApi::ChatCompletions, - ), - )) { - Ok(ProviderRequestType::ChatCompletionsRequest(req)) => req, - Ok( - ProviderRequestType::MessagesRequest(_) - | ProviderRequestType::BedrockConverse(_) - | ProviderRequestType::BedrockConverseStream(_) - | ProviderRequestType::ResponsesAPIRequest(_), - ) => { - // This should not happen after conversion to OpenAI format - warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); - let err_msg = "Request conversion failed".to_string(); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - Err(err) => { - warn!( - "Failed to convert request to ChatCompletionsRequest: {}", - err - ); - let err_msg = format!("Failed to convert request: {}", err); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); - } - }; - - debug!( - "[ARCH_ROUTER REQ]: {}", - &serde_json::to_string(&chat_completions_request_for_arch_router).unwrap() - ); - - let trace_parent = request_headers - .iter() - .find(|(ty, _)| ty.as_str() == "traceparent") - .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - - let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { - metadata - .get("archgw_preference_config") - .map(|value| value.to_string()) - }); - - let usage_preferences: Option> = usage_preferences_str - .as_ref() - .and_then(|s| serde_yaml::from_str(s).ok()); - - let latest_message_for_log = chat_completions_request_for_arch_router - .messages - .last() - .map_or("None".to_string(), |msg| { - msg.content.to_string().replace('\n', "\\n") - }); - - const MAX_MESSAGE_LENGTH: usize = 50; - let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH { - let truncated: String = latest_message_for_log - .chars() - .take(MAX_MESSAGE_LENGTH) - .collect(); - format!("{}...", truncated) - } else { - latest_message_for_log - }; - - info!( - "request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}", - usage_preferences.is_some(), - request_path, - latest_message_for_log - ); - - debug!("usage preferences from request: {:?}", usage_preferences); - - let model_name = match router_service - .determine_route( - &chat_completions_request_for_arch_router.messages, - trace_parent.clone(), - usage_preferences, - ) - .await - { - Ok(route) => match route { - Some((_, model_name)) => model_name, - None => { - info!( - "No route determined, using default model from request: {}", - chat_completions_request_for_arch_router.model - ); - chat_completions_request_for_arch_router.model.clone() - } - }, - Err(err) => { - let err_msg = format!("Failed to determine route: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(internal_error); - } - }; - - debug!( - "[ARCH_ROUTER] URL: {}, Resolved Model: {}", - full_qualified_llm_provider_url, model_name - ); - - request_headers.insert( - ARCH_PROVIDER_HINT_HEADER, - header::HeaderValue::from_str(&model_name).unwrap(), - ); - - request_headers.insert( - header::HeaderName::from_static(ARCH_IS_STREAMING_HEADER), - header::HeaderValue::from_str(&is_streaming_request.to_string()).unwrap(), - ); - - if let Some(trace_parent) = trace_parent { - request_headers.insert( - header::HeaderName::from_static("traceparent"), - header::HeaderValue::from_str(&trace_parent).unwrap(), - ); - } - // remove content-length header if it exists - request_headers.remove(header::CONTENT_LENGTH); - - let llm_response = match reqwest::Client::new() - .post(full_qualified_llm_provider_url) - .headers(request_headers) - .body(client_request_bytes_for_upstream) - .send() - .await - { - Ok(res) => res, - Err(err) => { - let err_msg = format!("Failed to send request: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(internal_error); - } - }; - - // copy over the headers and status code from the original response - let response_headers = llm_response.headers().clone(); - let upstream_status = llm_response.status(); - let mut response = Response::builder().status(upstream_status); - let headers = response.headers_mut().unwrap(); - for (header_name, header_value) in response_headers.iter() { - headers.insert(header_name, header_value.clone()); - } - - // Use the streaming utility with a passthrough processor (no modification of chunks) - let byte_stream = llm_response.bytes_stream(); - let processor = PassthroughProcessor; - let streaming_response = create_streaming_response(byte_stream, processor, 16); - - match response.body(streaming_response.body) { - Ok(response) => Ok(response), - Err(err) => { - let err_msg = format!("Failed to create response: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - Ok(internal_error) - } - } -} diff --git a/crates/brightstaff/src/handlers/router_chat.rs b/crates/brightstaff/src/handlers/router_chat.rs new file mode 100644 index 00000000..09b09975 --- /dev/null +++ b/crates/brightstaff/src/handlers/router_chat.rs @@ -0,0 +1,243 @@ +use common::configuration::ModelUsagePreference; +use common::traces::{TraceCollector, SpanKind, SpanBuilder, parse_traceparent}; +use hermesllm::clients::endpoints::SupportedUpstreamAPIs; +use hermesllm::{ProviderRequest, ProviderRequestType}; +use hyper::StatusCode; +use std::collections::HashMap; +use std::sync::Arc; +use tracing::{debug, info, warn}; + +use crate::router::llm_router::RouterService; +use crate::tracing::{OperationNameBuilder, operation_component, http, routing}; + +pub struct RoutingResult { + pub model_name: String +} + +pub struct RoutingError { + pub message: String, + pub status_code: StatusCode, +} + +impl RoutingError { + pub fn internal_error(message: String) -> Self { + Self { + message, + status_code: StatusCode::INTERNAL_SERVER_ERROR + } + } +} + +/// Determines the routing decision if +/// +/// # Returns +/// * `Ok(RoutingResult)` - Contains the selected model name and span ID +/// * `Err(RoutingError)` - Contains error details and optional span ID +pub async fn router_chat_get_upstream_model( + router_service: Arc, + client_request: ProviderRequestType, + request_headers: &hyper::HeaderMap, + trace_collector: Arc, + traceparent: &str, + request_path: &str, +) -> Result { + // Clone metadata for routing before converting (which consumes client_request) + let routing_metadata = client_request.metadata().clone(); + + // Convert to ChatCompletionsRequest for routing (regardless of input type) + let chat_request = match ProviderRequestType::try_from(( + client_request, + &SupportedUpstreamAPIs::OpenAIChatCompletions( + hermesllm::apis::OpenAIApi::ChatCompletions, + ), + )) { + Ok(ProviderRequestType::ChatCompletionsRequest(req)) => req, + Ok( + ProviderRequestType::MessagesRequest(_) + | ProviderRequestType::BedrockConverse(_) + | ProviderRequestType::BedrockConverseStream(_) + | ProviderRequestType::ResponsesAPIRequest(_), + ) => { + warn!("Unexpected: got non-ChatCompletions request after converting to OpenAI format"); + return Err(RoutingError::internal_error( + "Request conversion failed".to_string(), + )); + } + Err(err) => { + warn!("Failed to convert request to ChatCompletionsRequest: {}", err); + return Err(RoutingError::internal_error(format!( + "Failed to convert request: {}", + err + ))); + } + }; + + debug!( + "[ARCH_ROUTER REQ]: {}", + &serde_json::to_string(&chat_request).unwrap() + ); + + // Extract trace_parent from headers + let trace_parent = request_headers + .iter() + .find(|(ty, _)| ty.as_str() == "traceparent") + .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); + + // Extract usage preferences from metadata + let usage_preferences_str: Option = routing_metadata.as_ref().and_then(|metadata| { + metadata + .get("archgw_preference_config") + .map(|value| value.to_string()) + }); + + let usage_preferences: Option> = usage_preferences_str + .as_ref() + .and_then(|s| serde_yaml::from_str(s).ok()); + + // Prepare log message with latest message from chat request + let latest_message_for_log = chat_request + .messages + .last() + .map_or("None".to_string(), |msg| { + msg.content.to_string().replace('\n', "\\n") + }); + + const MAX_MESSAGE_LENGTH: usize = 50; + let latest_message_for_log = if latest_message_for_log.chars().count() > MAX_MESSAGE_LENGTH { + let truncated: String = latest_message_for_log + .chars() + .take(MAX_MESSAGE_LENGTH) + .collect(); + format!("{}...", truncated) + } else { + latest_message_for_log + }; + + info!( + "request received, request type: chat_completion, usage preferences from request: {}, request path: {}, latest message: {}", + usage_preferences.is_some(), + request_path, + latest_message_for_log + ); + + debug!("usage preferences from request: {:?}", usage_preferences); + + // Capture start time for routing span + let routing_start_time = std::time::Instant::now(); + let routing_start_system_time = std::time::SystemTime::now(); + + // Attempt to determine route using the router service + let routing_result = router_service + .determine_route(&chat_request.messages, trace_parent, usage_preferences) + .await; + + match routing_result { + Ok(route) => match route { + Some((_, model_name)) => { + // Record successful routing span + let mut attrs: HashMap = HashMap::new(); + attrs.insert("route.selected_model".to_string(), model_name.clone()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Ok(RoutingResult { + model_name + }) + } + None => { + // No route determined, use default model from request + info!( + "No route determined, using default model from request: {}", + chat_request.model + ); + + let default_model = chat_request.model.clone(); + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), default_model.clone()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Ok(RoutingResult { + model_name: default_model + }) + } + }, + Err(err) => { + // Record failed routing span + let mut attrs = HashMap::new(); + attrs.insert("route.selected_model".to_string(), "unknown".to_string()); + attrs.insert("error.message".to_string(), err.to_string()); + record_routing_span( + trace_collector, + traceparent, + routing_start_time, + routing_start_system_time, + attrs, + ) + .await; + + Err(RoutingError::internal_error( + format!("Failed to determine route: {}", err) + )) + } + } +} + +/// Helper function to record a routing span with the given attributes. +/// Reduces code duplication across different routing outcomes. +async fn record_routing_span( + trace_collector: Arc, + traceparent: &str, + start_time: std::time::Instant, + start_system_time: std::time::SystemTime, + attrs: HashMap, +) { + // The routing always uses OpenAI Chat Completions format internally, + // so we log that as the actual API being used for routing + let routing_api_path = "/v1/chat/completions"; + + let routing_operation_name = OperationNameBuilder::new() + .with_method("POST") + .with_path(routing_api_path) + .with_target("Arch-Router-1.5B") + .build(); + + let (trace_id, parent_span_id) = parse_traceparent(traceparent); + + // Build the routing span directly using constants + let mut span_builder = SpanBuilder::new(&routing_operation_name) + .with_trace_id(&trace_id) + .with_kind(SpanKind::Client) + .with_start_time(start_system_time) + .with_end_time(std::time::SystemTime::now()) + .with_attribute(http::METHOD, "POST") + .with_attribute(http::TARGET, routing_api_path.to_string()) + .with_attribute(routing::ROUTE_DETERMINATION_MS, start_time.elapsed().as_millis().to_string()); + + // Only set parent span ID if it exists (not a root span) + if let Some(parent) = parent_span_id { + span_builder = span_builder.with_parent_span_id(&parent); + } + + // Add all custom attributes + for (key, value) in attrs { + span_builder = span_builder.with_attribute(key, value); + } + + let span = span_builder.build(); + + // Record the span directly to the collector + trace_collector.record_span(operation_component::ROUTING, span); +} diff --git a/crates/brightstaff/src/handlers/utils.rs b/crates/brightstaff/src/handlers/utils.rs index 2d000874..6f84c1f3 100644 --- a/crates/brightstaff/src/handlers/utils.rs +++ b/crates/brightstaff/src/handlers/utils.rs @@ -1,18 +1,27 @@ use bytes::Bytes; +use common::traces::{Span, Attribute, AttributeValue, TraceCollector, Event}; use http_body_util::combinators::BoxBody; use http_body_util::StreamBody; use hyper::body::Frame; +use std::sync::Arc; +use std::time::{Instant, SystemTime}; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::warn; +// Import tracing constants +use crate::tracing::{llm, error}; + /// Trait for processing streaming chunks /// Implementors can inject custom logic during streaming (e.g., hallucination detection, logging) pub trait StreamProcessor: Send + 'static { /// Process an incoming chunk of bytes fn process_chunk(&mut self, chunk: Bytes) -> Result, String>; + /// Called when the first bytes are received (for time-to-first-token tracking) + fn on_first_bytes(&mut self) {} + /// Called when streaming completes successfully fn on_complete(&mut self) {} @@ -20,13 +29,152 @@ pub trait StreamProcessor: Send + 'static { fn on_error(&mut self, _error: &str) {} } -/// A no-op processor that just forwards chunks as-is -pub struct PassthroughProcessor; +/// A processor that tracks streaming metrics and finalizes the span +pub struct ObservableStreamProcessor { + collector: Arc, + service_name: String, + span: Span, + total_bytes: usize, + chunk_count: usize, + start_time: Instant, + time_to_first_token: Option, +} -impl StreamProcessor for PassthroughProcessor { +impl ObservableStreamProcessor { + /// Create a new passthrough processor + /// + /// # Arguments + /// * `collector` - The trace collector to record the span to + /// * `service_name` - The service name for this span (e.g., "archgw(llm)") + /// * `span` - The span to finalize after streaming completes + /// * `start_time` - When the request started (for duration calculation) + pub fn new( + collector: Arc, + service_name: impl Into, + span: Span, + start_time: Instant, + ) -> Self { + Self { + collector, + service_name: service_name.into(), + span, + total_bytes: 0, + chunk_count: 0, + start_time, + time_to_first_token: None, + } + } +} + +impl StreamProcessor for ObservableStreamProcessor { fn process_chunk(&mut self, chunk: Bytes) -> Result, String> { + self.total_bytes += chunk.len(); + self.chunk_count += 1; Ok(Some(chunk)) } + + fn on_first_bytes(&mut self) { + // Record time to first token (only for streaming) + if self.time_to_first_token.is_none() { + self.time_to_first_token = Some(self.start_time.elapsed().as_millis()); + } + } + + fn on_complete(&mut self) { + // Update span with streaming metrics and end time + let end_time_nanos = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + + self.span.end_time_unix_nano = format!("{}", end_time_nanos); + + // Add streaming metrics as attributes using constants + self.span.attributes.push(Attribute { + key: llm::RESPONSE_BYTES.to_string(), + value: AttributeValue { + string_value: Some(self.total_bytes.to_string()), + }, + }); + + + self.span.attributes.push(Attribute { + key: llm::DURATION_MS.to_string(), + value: AttributeValue { + string_value: Some(self.start_time.elapsed().as_millis().to_string()), + }, + }); + + // Add time to first token if available (streaming only) + if let Some(ttft) = self.time_to_first_token { + self.span.attributes.push(Attribute { + key: llm::TIME_TO_FIRST_TOKEN_MS.to_string(), + value: AttributeValue { + string_value: Some(ttft.to_string()), + }, + }); + + // Add time to first token as a span event + // Calculate the timestamp by adding ttft duration to span start time + if let Ok(start_time_nanos) = self.span.start_time_unix_nano.parse::() { + // Convert ttft from milliseconds to nanoseconds and add to start time + let event_timestamp = start_time_nanos + (ttft * 1_000_000); + let mut event = Event::new(llm::TIME_TO_FIRST_TOKEN_MS.to_string(), event_timestamp); + event.add_attribute( + llm::TIME_TO_FIRST_TOKEN_MS.to_string(), + ttft.to_string(), + ); + + // Initialize events vector if needed + if self.span.events.is_none() { + self.span.events = Some(Vec::new()); + } + + if let Some(ref mut events) = self.span.events { + events.push(event); + } + } + } + + // Record the finalized span + self.collector.record_span(&self.service_name, self.span.clone()); + } + + fn on_error(&mut self, error_msg: &str) { + warn!("Stream error in PassthroughProcessor: {}", error_msg); + + // Update span with error info and end time + let end_time_nanos = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos(); + + self.span.end_time_unix_nano = format!("{}", end_time_nanos); + + self.span.attributes.push(Attribute { + key: error::ERROR.to_string(), + value: AttributeValue { + string_value: Some("true".to_string()), + }, + }); + + self.span.attributes.push(Attribute { + key: error::MESSAGE.to_string(), + value: AttributeValue { + string_value: Some(error_msg.to_string()), + }, + }); + + self.span.attributes.push(Attribute { + key: llm::DURATION_MS.to_string(), + value: AttributeValue { + string_value: Some(self.start_time.elapsed().as_millis().to_string()), + }, + }); + + // Record the error span + self.collector.record_span(&self.service_name, self.span.clone()); + } } /// Result of creating a streaming response @@ -48,6 +196,8 @@ where // Spawn a task to process and forward chunks let processor_handle = tokio::spawn(async move { + let mut is_first_chunk = true; + while let Some(item) = byte_stream.next().await { let chunk = match item { Ok(chunk) => chunk, @@ -59,6 +209,12 @@ where } }; + // Call on_first_bytes for the first chunk + if is_first_chunk { + processor.on_first_bytes(); + is_first_chunk = false; + } + // Process the chunk match processor.process_chunk(chunk) { Ok(Some(processed_chunk)) => { @@ -91,3 +247,13 @@ where processor_handle, } } + +/// Truncates a message to the specified maximum length, adding "..." if truncated. +pub fn truncate_message(message: &str, max_length: usize) -> String { + if message.chars().count() > max_length { + let truncated: String = message.chars().take(max_length).collect(); + format!("{}...", truncated) + } else { + message.to_string() + } +} diff --git a/crates/brightstaff/src/lib.rs b/crates/brightstaff/src/lib.rs index a4b5b3ae..ceff49f1 100644 --- a/crates/brightstaff/src/lib.rs +++ b/crates/brightstaff/src/lib.rs @@ -1,3 +1,4 @@ pub mod handlers; pub mod router; +pub mod tracing; pub mod utils; diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 40e4a0b3..d73fe3df 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -1,5 +1,5 @@ use brightstaff::handlers::agent_chat_completions::agent_chat; -use brightstaff::handlers::router::router_chat; +use brightstaff::handlers::llm::llm_chat; use brightstaff::handlers::models::list_models; use brightstaff::handlers::function_calling::{function_calling_chat_handler}; use brightstaff::router::llm_router::RouterService; @@ -7,6 +7,7 @@ use brightstaff::utils::tracing::init_tracer; use bytes::Bytes; use common::configuration::{Agent, Configuration}; use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH}; +use common::traces::TraceCollector; use http_body_util::{combinators::BoxBody, BodyExt, Empty}; use hyper::body::Incoming; use hyper::server::conn::http1; @@ -46,10 +47,6 @@ async fn main() -> Result<(), Box> { let _tracer_provider = init_tracer(); let bind_address = env::var("BIND_ADDRESS").unwrap_or_else(|_| BIND_ADDRESS.to_string()); - info!( - "current working directory: {}", - env::current_dir().unwrap().display() - ); // loading arch_config.yaml file let arch_config_path = env::var("ARCH_CONFIG_PATH_RENDERED") .unwrap_or_else(|_| "./arch_config_rendered.yaml".to_string()); @@ -76,19 +73,10 @@ async fn main() -> Result<(), Box> { let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone())); let agents_list = Arc::new(RwLock::new(Some(all_agents))); let listeners = Arc::new(RwLock::new(arch_config.listeners.clone())); - - debug!( - "arch_config: {:?}", - &serde_json::to_string(arch_config.as_ref()).unwrap() - ); - let llm_provider_url = env::var("LLM_PROVIDER_ENDPOINT").unwrap_or_else(|_| "http://localhost:12001".to_string()); - info!("llm provider url: {}", llm_provider_url); - info!("listening on http://{}", bind_address); let listener = TcpListener::bind(bind_address).await?; - let routing_model_name: String = arch_config .routing .as_ref() @@ -110,18 +98,33 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::new(arch_config.model_aliases.clone()); + // Initialize trace collector and start background flusher + // Tracing is enabled if the tracing config is present in arch_config.yaml + // Pass Some(true/false) to override, or None to use env var OTEL_TRACING_ENABLED + let tracing_enabled = if arch_config.tracing.is_some() { + info!("Tracing configuration found in arch_config.yaml"); + Some(true) + } else { + info!("No tracing configuration in arch_config.yaml, will check OTEL_TRACING_ENABLED env var"); + None + }; + let trace_collector = Arc::new(TraceCollector::new(tracing_enabled)); + let _flusher_handle = trace_collector.clone().start_background_flusher(); + + loop { let (stream, _) = listener.accept().await?; let peer_addr = stream.peer_addr()?; let io = TokioIo::new(stream); let router_service: Arc = Arc::clone(&router_service); - let model_aliases = Arc::clone(&model_aliases); + let model_aliases: Arc>> = Arc::clone(&model_aliases); let llm_provider_url = llm_provider_url.clone(); let llm_providers = llm_providers.clone(); let agents_list = agents_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); let parent_cx = extract_context_from_request(&req); @@ -130,13 +133,14 @@ async fn main() -> Result<(), Box> { let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); let listeners = listeners.clone(); + let trace_collector = trace_collector.clone(); async move { match (req.method(), req.uri().path()) { (&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH | OPENAI_RESPONSES_API_PATH) => { let fully_qualified_url = format!("{}{}", llm_provider_url, req.uri().path()); - router_chat(req, router_service, fully_qualified_url, model_aliases) + llm_chat(req, router_service, fully_qualified_url, model_aliases, llm_providers, trace_collector) .with_context(parent_cx) .await } diff --git a/crates/brightstaff/src/tracing/constants.rs b/crates/brightstaff/src/tracing/constants.rs new file mode 100644 index 00000000..bd946aac --- /dev/null +++ b/crates/brightstaff/src/tracing/constants.rs @@ -0,0 +1,319 @@ +/// OpenTelemetry Semantic Conventions +/// +/// This module defines standard attribute keys following OTEL semantic conventions. +/// See: https://opentelemetry.io/docs/specs/semconv/ + +// ============================================================================= +// Span Attributes - HTTP +// ============================================================================= + +/// Semantic conventions for HTTP-related span attributes +pub mod http { + /// HTTP request method + /// Example: "GET", "POST", "PUT" + pub const METHOD: &str = "http.method"; + + /// HTTP response status code + /// Example: "200", "404", "500" + pub const STATUS_CODE: &str = "http.status_code"; + + /// Full HTTP request URL + pub const URL: &str = "http.url"; + + /// HTTP request target (path + query) + /// Example: "/v1/chat/completions?stream=true" + pub const TARGET: &str = "http.target"; + + /// Upstream target path after routing transformation + /// Example: "/api/paas/v4/chat/completions" (for Zhipu provider) + pub const UPSTREAM_TARGET: &str = "http.upstream_target"; + + /// HTTP request scheme + /// Example: "http", "https" + pub const SCHEME: &str = "http.scheme"; + + /// Value of the HTTP User-Agent header + pub const USER_AGENT: &str = "http.user_agent"; + + /// Size of the request payload body in bytes + pub const REQUEST_CONTENT_LENGTH: &str = "http.request_content_length"; + + /// Size of the response payload body in bytes + pub const RESPONSE_CONTENT_LENGTH: &str = "http.response_content_length"; +} + +// ============================================================================= +// Span Attributes - LLM Specific +// ============================================================================= + +/// Custom attributes for LLM operations +/// These follow the emerging OTEL GenAI semantic conventions +pub mod llm { + /// Name of the LLM model being called + /// Example: "gpt-4", "claude-3-sonnet", "llama-2-70b" + pub const MODEL_NAME: &str = "llm.model"; + + /// Provider of the LLM + /// Example: "openai", "anthropic", "azure-openai" + pub const PROVIDER: &str = "llm.provider"; + + /// Type of LLM operation + /// Example: "chat", "completion", "embedding" + pub const OPERATION_TYPE: &str = "llm.operation_type"; + + /// Whether the request is streaming + pub const IS_STREAMING: &str = "llm.is_streaming"; + + /// Total bytes received in the response + pub const RESPONSE_BYTES: &str = "llm.response_bytes"; + + /// Duration of the LLM call in milliseconds + pub const DURATION_MS: &str = "llm.duration_ms"; + + /// Time to first token in milliseconds (streaming only) + pub const TIME_TO_FIRST_TOKEN_MS: &str = "llm.time_to_first_token"; + + /// Number of prompt tokens used + pub const PROMPT_TOKENS: &str = "llm.usage.prompt_tokens"; + + /// Number of completion tokens generated + pub const COMPLETION_TOKENS: &str = "llm.usage.completion_tokens"; + + /// Total tokens used (prompt + completion) + pub const TOTAL_TOKENS: &str = "llm.usage.total_tokens"; + + /// Temperature parameter used + pub const TEMPERATURE: &str = "llm.temperature"; + + /// Max tokens parameter used + pub const MAX_TOKENS: &str = "llm.max_tokens"; + + /// Top-p parameter used + pub const TOP_P: &str = "llm.top_p"; + + /// List of tool names provided in the request + pub const TOOLS: &str = "llm.tools"; + + /// Preview of the user message (truncated) + pub const USER_MESSAGE_PREVIEW: &str = "llm.user_message_preview"; +} + +// ============================================================================= +// Span Attributes - Routing & Gateway +// ============================================================================= + +/// Attributes specific to LLM routing and gateway operations +pub mod routing { + /// Strategy used to select the LLM endpoint + /// Example: "round-robin", "least-latency", "cost-optimized" + pub const STRATEGY: &str = "routing.strategy"; + + /// Selected upstream endpoint + pub const UPSTREAM_ENDPOINT: &str = "routing.upstream_endpoint"; + + /// Time taken to determine the route in milliseconds + pub const ROUTE_DETERMINATION_MS: &str = "routing.determination_ms"; + + /// Whether a fallback endpoint was used + pub const IS_FALLBACK: &str = "routing.is_fallback"; + + /// Reason for route selection + pub const SELECTION_REASON: &str = "routing.selection_reason"; +} + +// ============================================================================= +// Span Attributes - Error Handling +// ============================================================================= + +/// Attributes for error and exception tracking +pub mod error { + /// Whether an error occurred + pub const ERROR: &str = "error"; + + /// Type/class of the error + /// Example: "TimeoutError", "AuthenticationError" + pub const TYPE: &str = "error.type"; + + /// Error message + pub const MESSAGE: &str = "error.message"; + + /// Stack trace of the error + pub const STACK_TRACE: &str = "error.stack_trace"; +} + +// ============================================================================= +// Operation Names +// ============================================================================= + +/// Canonical operation name components for Arch Gateway +pub mod operation_component { + /// Inbound request handling + pub const INBOUND: &str = "plano(inbound)"; + + /// Routing decision phase + pub const ROUTING: &str = "plano(routing)"; + + /// Handoff to upstream service + pub const HANDOFF: &str = "plano(handoff)"; + + /// Agent filter execution + pub const AGENT_FILTER: &str = "plano(agent filter)"; + + /// Agent execution + pub const AGENT: &str = "plano(agent)"; + + /// LLM call + pub const LLM: &str = "plano(llm)"; +} + +/// Builder for constructing standardized operation names +/// +/// Format: `{method} {path} {target}` +/// +/// The operation component (e.g., "archgw(llm)") is now part of the service name, +/// so the operation name focuses on the HTTP request details and target. +/// +/// # Examples +/// ``` +/// use brightstaff::tracing::OperationNameBuilder; +/// +/// // LLM call operation: "POST /v1/chat/completions gpt-4" +/// // (service name will be "archgw(llm)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/v1/chat/completions") +/// .with_target("gpt-4") +/// .build(); +/// +/// // Agent filter operation: "POST /agents/v1/chat/completions hallucination-detector" +/// // (service name will be "archgw(agent filter)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/agents/v1/chat/completions") +/// .with_target("hallucination-detector") +/// .build(); +/// +/// // Routing operation: "POST /v1/chat/completions" +/// // (service name will be "archgw(routing)") +/// let op = OperationNameBuilder::new() +/// .with_method("POST") +/// .with_path("/v1/chat/completions") +/// .build(); +/// ``` +pub struct OperationNameBuilder { + method: Option, + path: Option, + target: Option, +} + +impl OperationNameBuilder { + /// Create a new operation name builder + pub fn new() -> Self { + Self { + method: None, + path: None, + target: None, + } + } + + /// Set the HTTP method + /// + /// # Arguments + /// * `method` - HTTP method (e.g., "GET", "POST", "PUT") + pub fn with_method(mut self, method: impl Into) -> Self { + self.method = Some(method.into()); + self + } + + /// Set the request path + /// + /// # Arguments + /// * `path` - Request path (e.g., "/v1/chat/completions", "/agents/v1/chat/completions") + pub fn with_path(mut self, path: impl Into) -> Self { + self.path = Some(path.into()); + self + } + + /// Set the target (model name, agent name, or filter name) + /// + /// # Arguments + /// * `target` - Target identifier (e.g., "gpt-4", "my-agent", "hallucination-detector") + pub fn with_target(mut self, target: impl Into) -> Self { + self.target = Some(target.into()); + self + } + + /// Build the operation name string + /// + /// # Format + /// - With all components: `{method} {path} {target}` + /// - Without target: `{method} {path}` + /// - Without path: `{method}` + /// - Empty: returns empty string + pub fn build(self) -> String { + let mut parts = Vec::new(); + + if let Some(method) = self.method { + parts.push(method); + } + + if let Some(path) = self.path { + parts.push(path); + } + + if let Some(target) = self.target { + parts.push(target); + } + + parts.join(" ") + } +} + +impl Default for OperationNameBuilder { + fn default() -> Self { + Self::new() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_operation_name_full() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/v1/chat/completions") + .with_target("gpt-4") + .build(); + + assert_eq!(op, "POST /v1/chat/completions gpt-4"); + } + + #[test] + fn test_operation_name_no_target() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/v1/chat/completions") + .build(); + + assert_eq!(op, "POST /v1/chat/completions"); + } + + #[test] + fn test_operation_name_agent_filter() { + let op = OperationNameBuilder::new() + .with_method("POST") + .with_path("/agents/v1/chat/completions") + .with_target("content-filter") + .build(); + + assert_eq!(op, "POST /agents/v1/chat/completions content-filter"); + } + + #[test] + fn test_operation_name_minimal() { + let op = OperationNameBuilder::new().build(); + assert_eq!(op, ""); + } +} diff --git a/crates/brightstaff/src/tracing/mod.rs b/crates/brightstaff/src/tracing/mod.rs new file mode 100644 index 00000000..bacc9571 --- /dev/null +++ b/crates/brightstaff/src/tracing/mod.rs @@ -0,0 +1,3 @@ +mod constants; + +pub use constants::{OperationNameBuilder, operation_component, http, llm, error, routing}; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index aa95e2e4..4c659bfe 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -21,6 +21,18 @@ url = "2.5.4" hermesllm = { version = "0.1.0", path = "../hermesllm" } serde_with = "3.13.0" +# Optional dependencies for trace collection (not available in WASM) +tokio = { version = "1.44", features = ["sync", "time"], optional = true } +reqwest = { version = "0.12", features = ["json"], optional = true } +tracing = { version = "0.1", optional = true } + +[features] +default = [] +trace-collection = ["tokio", "reqwest", "tracing"] + [dev-dependencies] pretty_assertions = "1.4.1" serde_json = "1.0.64" +serial_test = "3.2" +axum = "0.7" +tokio = { version = "1.44", features = ["sync", "time", "macros", "rt"] } diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 76c368f1..9c8f5787 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -11,4 +11,5 @@ pub mod routing; pub mod stats; pub mod tokenizer; pub mod tracing; +pub mod traces; pub mod utils; diff --git a/crates/common/src/traces/collector.rs b/crates/common/src/traces/collector.rs new file mode 100644 index 00000000..0fc407b2 --- /dev/null +++ b/crates/common/src/traces/collector.rs @@ -0,0 +1,285 @@ +use super::shapes::Span; +use super::resource_span_builder::ResourceSpanBuilder; +use std::collections::{HashMap, VecDeque}; +use std::sync::Arc; +use tokio::sync::Mutex; +use tokio::time::{interval, Duration}; +use tracing::{debug, error, warn}; + +/// Parse W3C traceparent header into trace_id and parent_span_id +/// Format: "00-{trace_id}-{parent_span_id}-01" +/// +/// Returns (trace_id, Option) +/// - parent_span_id is None if it's all zeros (0000000000000000), indicating a root span +pub fn parse_traceparent(traceparent: &str) -> (String, Option) { + let parts: Vec<&str> = traceparent.split('-').collect(); + if parts.len() == 4 { + let trace_id = parts[1].to_string(); + let parent_span_id = parts[2].to_string(); + + // If parent_span_id is all zeros, this is a root span with no parent + let parent = if parent_span_id == "0000000000000000" { + None + } else { + Some(parent_span_id) + }; + + (trace_id, parent) + } else { + warn!("Invalid traceparent format: {}", traceparent); + // Return empty trace ID and None for parent if parsing fails + (String::new(), None) + } +} + +/// Collects and batches spans, flushing them to an OTEL collector +/// +/// Supports multiple services, with each service (e.g., "archgw(routing)", "archgw(llm)") +/// maintaining its own span queue. Flushes all services together periodically. +/// +/// Tracing can be enabled/disabled in two ways: +/// 1. Via arch_config.yaml: presence of `tracing` configuration section +/// 2. Via environment variable: `OTEL_TRACING_ENABLED=true/false` +/// +/// When disabled, span recording and flushing are no-ops. +pub struct TraceCollector { + /// Spans grouped by service name + /// Key: service name (e.g., "archgw(routing)", "archgw(llm)") + /// Value: queue of spans for that service + spans_by_service: Arc>>>, + flush_interval: Duration, + otel_url: String, + /// Whether tracing is enabled + enabled: bool, +} + +impl TraceCollector { + /// Create a new trace collector + /// + /// # Arguments + /// * `enabled` - Whether tracing is enabled + /// - `Some(true)` - Force enable tracing + /// - `Some(false)` - Force disable tracing + /// - `None` - Check `OTEL_TRACING_ENABLED` env var (defaults to true if not set) + /// + /// Other parameters are read from environment variables: + /// - `TRACE_FLUSH_INTERVAL_MS` - Flush interval in milliseconds (default: 1000) + /// - `OTEL_COLLECTOR_URL` - OTEL collector endpoint (default: http://localhost:9903/v1/traces) + pub fn new(enabled: Option) -> Self { + let flush_interval_ms = std::env::var("TRACE_FLUSH_INTERVAL_MS") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(1000); + + let otel_url = std::env::var("OTEL_COLLECTOR_URL") + .unwrap_or_else(|_| "http://localhost:9903/v1/traces".to_string()); + + // Determine if tracing is enabled: + // 1. Use explicit parameter if provided + // 2. Otherwise check OTEL_TRACING_ENABLED env var + // 3. Default to false if neither is set (tracing opt-in, not opt-out) + let enabled = enabled.unwrap_or_else(|| { + std::env::var("OTEL_TRACING_ENABLED") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(false) + }); + + debug!( + "TraceCollector initialized: flush_interval={}ms, url={}, enabled={}", + flush_interval_ms, otel_url, enabled + ); + + Self { + spans_by_service: Arc::new(Mutex::new(HashMap::new())), + flush_interval: Duration::from_millis(flush_interval_ms), + otel_url, + enabled, + } + } + + /// Record a span for a specific service + /// + /// # Arguments + /// * `service_name` - Name of the service (e.g., "archgw(routing)", "archgw(llm)") + /// * `span` - The span to record + pub fn record_span(&self, service_name: impl Into, span: Span) { + // Skip recording if tracing is disabled + if !self.enabled { + return; + } + + let service_name = service_name.into(); + + // Use try_lock to avoid blocking in async contexts + // If the lock is held, we skip recording (telemetry shouldn't block the app) + if let Ok(mut spans_by_service) = self.spans_by_service.try_lock() { + // Get or create the queue for this service + let spans = spans_by_service + .entry(service_name) + .or_insert_with(VecDeque::new); + + spans.push_back(span); + } else { + // Lock contention - skip recording this span + debug!("Skipped span recording due to lock contention"); + } + // Flushing is handled by the periodic background flusher (see `start_background_flusher`). + } + + /// Flush all buffered spans to the OTEL collector + /// Builds ResourceSpans for each service with spans + pub async fn flush(&self) -> Result<(), Box> { + // Skip flushing if tracing is disabled + if !self.enabled { + return Ok(()); + } + + let mut spans_by_service = self.spans_by_service.lock().await; + + if spans_by_service.is_empty() { + return Ok(()); + } + + // Snapshot and drain all services' spans + let service_batches: Vec<(String, Vec)> = spans_by_service + .iter_mut() + .filter_map(|(service_name, spans)| { + if spans.is_empty() { + None + } else { + Some((service_name.clone(), spans.drain(..).collect())) + } + }) + .collect(); + + drop(spans_by_service); // Release lock before HTTP call + + if service_batches.is_empty() { + return Ok(()); + } + + let total_spans: usize = service_batches.iter().map(|(_, spans)| spans.len()).sum(); + debug!("Flushing {} spans across {} services to OTEL collector", total_spans, service_batches.len()); + + // Build canonical OTEL payload structure - one ResourceSpan per service + let resource_spans = self.build_resource_spans(service_batches); + + match self.send_to_otel(resource_spans).await { + Ok(_) => { + debug!("Successfully flushed {} spans", total_spans); + Ok(()) + } + Err(e) => { + warn!("Failed to send spans to OTEL collector: {:?}", e); + Err(e) + } + } + } + + /// Build OTEL-compliant resource spans from collected spans, one ResourceSpan per service + fn build_resource_spans(&self, service_batches: Vec<(String, Vec)>) -> Vec { + service_batches + .into_iter() + .map(|(service_name, spans)| { + ResourceSpanBuilder::new(&service_name) + .add_spans(spans) + .build() + }) + .collect() + } + + /// Send resource spans to OTEL collector + /// Serializes as {"resourceSpans": [...]} per OTEL spec + async fn send_to_otel( + &self, + resource_spans: Vec, + ) -> Result<(), Box> { + let client = reqwest::Client::new(); + + // Create OTEL payload with proper structure + let payload = serde_json::json!({ + "resourceSpans": resource_spans + }); + + let response = client + .post(&self.otel_url) + .header("Content-Type", "application/json") + .json(&payload) + .timeout(Duration::from_secs(5)) + .send() + .await?; + + if !response.status().is_success() { + warn!( + "OTEL collector returned non-success status: {}", + response.status() + ); + return Err(format!("OTEL collector error: {}", response.status()).into()); + } + + Ok(()) + } + + /// Start a background task that periodically flushes traces + /// Returns a join handle that can be used to stop the flusher + pub fn start_background_flusher(self: Arc) -> tokio::task::JoinHandle<()> { + let flush_interval = self.flush_interval; + + tokio::spawn(async move { + let mut ticker = interval(flush_interval); + + loop { + ticker.tick().await; + + if let Err(e) = self.flush().await { + error!("Background trace flush failed: {:?}", e); + } + } + }) + } + + /// Get current number of buffered spans across all services (for testing/monitoring) + pub async fn buffered_count(&self) -> usize { + self.spans_by_service + .lock() + .await + .values() + .map(|spans| spans.len()) + .sum() + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traces::SpanBuilder; + + #[tokio::test] + async fn test_collector_basic() { + let collector = TraceCollector::new(Some(true)); + + let span = SpanBuilder::new("test_operation") + .with_trace_id("abc123") + .build(); + + collector.record_span("test-service", span); + + assert_eq!(collector.buffered_count().await, 1); + } + + #[tokio::test] + async fn test_collector_auto_flush() { + // Since batch-triggered flush behavior was removed, record two spans and verify both are buffered + let collector = Arc::new(TraceCollector::new(Some(true))); + + let span1 = SpanBuilder::new("test1").build(); + let span2 = SpanBuilder::new("test2").build(); + + collector.record_span("test-service", span1); + collector.record_span("test-service", span2); + + // With no batch-triggered flush, both spans should remain buffered + assert_eq!(collector.buffered_count().await, 2); + } +} diff --git a/crates/common/src/traces/constants.rs b/crates/common/src/traces/constants.rs new file mode 100644 index 00000000..09bdecd5 --- /dev/null +++ b/crates/common/src/traces/constants.rs @@ -0,0 +1,27 @@ +/// OpenTelemetry semantic convention constants for tracing +/// +/// These constants ensure consistency across the codebase and prevent typos + +/// Resource attribute keys following OTEL semantic conventions +pub mod resource { + /// Logical name of the service + pub const SERVICE_NAME: &str = "service.name"; + + /// Version of the service + pub const SERVICE_VERSION: &str = "service.version"; + + /// Service namespace/environment + pub const SERVICE_NAMESPACE: &str = "service.namespace"; + + /// Service instance ID + pub const SERVICE_INSTANCE_ID: &str = "service.instance.id"; +} + +/// Instrumentation scope defaults +pub mod scope { + /// Default scope name for tracing instrumentation + pub const DEFAULT_NAME: &str = "brightstaff.tracing"; + + /// Default scope version + pub const DEFAULT_VERSION: &str = "1.0.0"; +} diff --git a/crates/common/src/traces/mod.rs b/crates/common/src/traces/mod.rs new file mode 100644 index 00000000..c0d042fa --- /dev/null +++ b/crates/common/src/traces/mod.rs @@ -0,0 +1,26 @@ +// Original tracing types (OTEL structures) +mod shapes; +// New tracing utilities +mod span_builder; +mod resource_span_builder; +mod constants; + +#[cfg(feature = "trace-collection")] +mod collector; + +#[cfg(all(test, feature = "trace-collection"))] +mod tests; + +// Re-export original types +pub use shapes::{ + Span, Event, Traceparent, TraceparentNewError, + ResourceSpan, Resource, ScopeSpan, Scope, Attribute, AttributeValue, +}; + +// Re-export new utilities +pub use span_builder::{SpanBuilder, SpanKind}; +pub use resource_span_builder::ResourceSpanBuilder; +pub use constants::*; + +#[cfg(feature = "trace-collection")] +pub use collector::{TraceCollector, parse_traceparent}; diff --git a/crates/common/src/traces/resource_span_builder.rs b/crates/common/src/traces/resource_span_builder.rs new file mode 100644 index 00000000..3e0dd88f --- /dev/null +++ b/crates/common/src/traces/resource_span_builder.rs @@ -0,0 +1,121 @@ +use super::shapes::{ResourceSpan, Resource, ScopeSpan, Scope, Span, Attribute, AttributeValue}; +use super::constants::{resource, scope}; +use std::collections::HashMap; + +/// Builder for creating OTEL ResourceSpan structures +/// +/// Provides a fluent API for building the resource/scope/span hierarchy +pub struct ResourceSpanBuilder { + service_name: String, + resource_attributes: HashMap, + scope_name: String, + scope_version: String, + spans: Vec, +} + +impl ResourceSpanBuilder { + /// Create a new ResourceSpan builder with service name + pub fn new(service_name: impl Into) -> Self { + Self { + service_name: service_name.into(), + resource_attributes: HashMap::new(), + scope_name: scope::DEFAULT_NAME.to_string(), + scope_version: scope::DEFAULT_VERSION.to_string(), + spans: Vec::new(), + } + } + + /// Add a resource attribute (e.g., deployment.environment, host.name) + pub fn with_resource_attribute(mut self, key: impl Into, value: impl Into) -> Self { + self.resource_attributes.insert(key.into(), value.into()); + self + } + + /// Set the instrumentation scope name + pub fn with_scope_name(mut self, name: impl Into) -> Self { + self.scope_name = name.into(); + self + } + + /// Set the instrumentation scope version + pub fn with_scope_version(mut self, version: impl Into) -> Self { + self.scope_version = version.into(); + self + } + + /// Add a single span + pub fn add_span(mut self, span: Span) -> Self { + self.spans.push(span); + self + } + + /// Add multiple spans + pub fn add_spans(mut self, spans: Vec) -> Self { + self.spans.extend(spans); + self + } + + /// Build the ResourceSpan + pub fn build(self) -> ResourceSpan { + // Build resource attributes + let mut attributes = vec![ + Attribute { + key: resource::SERVICE_NAME.to_string(), + value: AttributeValue { + string_value: Some(self.service_name), + }, + } + ]; + + // Add custom resource attributes + for (key, value) in self.resource_attributes { + attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } + + let resource = Resource { attributes }; + + let scope = Scope { + name: self.scope_name, + version: self.scope_version, + attributes: Vec::new(), + }; + + let scope_span = ScopeSpan { + scope, + spans: self.spans, + }; + + ResourceSpan { + resource, + scope_spans: vec![scope_span], + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::traces::SpanBuilder; + + #[test] + fn test_resource_span_builder() { + let span1 = SpanBuilder::new("operation1").build(); + let span2 = SpanBuilder::new("operation2").build(); + + let resource_span = ResourceSpanBuilder::new("test-service") + .with_resource_attribute("deployment.environment", "production") + .with_scope_name("test-scope") + .add_span(span1) + .add_span(span2) + .build(); + + assert_eq!(resource_span.resource.attributes.len(), 2); // service.name + custom + assert_eq!(resource_span.scope_spans.len(), 1); + assert_eq!(resource_span.scope_spans[0].spans.len(), 2); + } +} diff --git a/crates/common/src/traces/shapes.rs b/crates/common/src/traces/shapes.rs new file mode 100644 index 00000000..5f521767 --- /dev/null +++ b/crates/common/src/traces/shapes.rs @@ -0,0 +1,123 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ResourceSpan { + pub resource: Resource, + #[serde(rename = "scopeSpans")] + pub scope_spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Resource { + pub attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ScopeSpan { + pub scope: Scope, + pub spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Scope { + pub name: String, + pub version: String, + pub attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Span { + #[serde(rename = "traceId")] + pub trace_id: String, + #[serde(rename = "spanId")] + pub span_id: String, + #[serde(rename = "parentSpanId")] + pub parent_span_id: Option, // Optional in case there's no parent span + pub name: String, + #[serde(rename = "startTimeUnixNano")] + pub start_time_unix_nano: String, + #[serde(rename = "endTimeUnixNano")] + pub end_time_unix_nano: String, + pub kind: u32, + pub attributes: Vec, + pub events: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Event { + #[serde(rename = "timeUnixNano")] + pub time_unix_nano: String, + pub name: String, + pub attributes: Vec, +} + +impl Event { + pub fn new(name: String, time_unix_nano: u128) -> Self { + Event { + time_unix_nano: format!("{}", time_unix_nano), + name, + attributes: Vec::new(), + } + } + + pub fn add_attribute(&mut self, key: String, value: String) { + self.attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct Attribute { + pub key: String, + pub value: AttributeValue, +} + +#[derive(Serialize, Deserialize, Debug, Clone)] +pub struct AttributeValue { + #[serde(rename = "stringValue")] + pub string_value: Option, // Use Option to handle different value types +} + +pub struct Traceparent { + pub version: String, + pub trace_id: String, + pub parent_id: String, + pub flags: String, +} + +impl std::fmt::Display for Traceparent { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{}-{}-{}-{}", + self.version, self.trace_id, self.parent_id, self.flags + ) + } +} + +#[derive(thiserror::Error, Debug)] +pub enum TraceparentNewError { + #[error("Invalid traceparent: \'{0}\'")] + InvalidTraceparent(String), +} + +impl TryFrom for Traceparent { + type Error = TraceparentNewError; + + fn try_from(traceparent: String) -> Result { + let traceparent_tokens: Vec<&str> = traceparent.split("-").collect::>(); + if traceparent_tokens.len() != 4 { + return Err(TraceparentNewError::InvalidTraceparent(traceparent)); + } + Ok(Traceparent { + version: traceparent_tokens[0].to_string(), + trace_id: traceparent_tokens[1].to_string(), + parent_id: traceparent_tokens[2].to_string(), + flags: traceparent_tokens[3].to_string(), + }) + } +} diff --git a/crates/common/src/traces/span_builder.rs b/crates/common/src/traces/span_builder.rs new file mode 100644 index 00000000..187c1678 --- /dev/null +++ b/crates/common/src/traces/span_builder.rs @@ -0,0 +1,193 @@ +use super::shapes::{Span, Attribute, AttributeValue}; +use std::collections::HashMap; +use std::time::SystemTime; + +/// OpenTelemetry span kinds +/// https://opentelemetry.io/docs/specs/otel/trace/api/#spankind +#[derive(Debug, Clone, Copy)] +pub enum SpanKind { + /// Default value. Indicates that the span represents an internal operation within an application + Internal = 0, + /// Indicates that the span describes a request to some remote service + Client = 3, +} + +/// Builder for creating OTEL-compliant spans with a fluent API +/// +/// This is the recommended way to create spans with proper trace context. +/// +/// # Example +/// ```no_run +/// use common::traces::{SpanBuilder, SpanKind}; +/// use std::time::SystemTime; +/// +/// let span = SpanBuilder::new("router_chat") +/// .with_trace_id("abc123") +/// .with_parent_span_id("parent456") +/// .with_kind(SpanKind::Internal) +/// .with_attribute("http.method", "POST") +/// .with_attribute("http.path", "/v1/chat/completions") +/// .build(); +/// ``` +pub struct SpanBuilder { + name: String, + trace_id: Option, + parent_span_id: Option, + start_time: SystemTime, + end_time: Option, + kind: SpanKind, + attributes: HashMap, +} + +impl SpanBuilder { + /// Create a new span builder + /// + /// # Arguments + /// * `name` - The operation name for this span (e.g., "router_chat", "determine_route") + pub fn new(name: impl Into) -> Self { + Self { + name: name.into(), + trace_id: None, + parent_span_id: None, + start_time: SystemTime::now(), + end_time: None, + kind: SpanKind::Internal, + attributes: HashMap::new(), + } + } + + /// Set the trace ID (extracted from traceparent or OpenTelemetry context) + pub fn with_trace_id(mut self, trace_id: impl Into) -> Self { + self.trace_id = Some(trace_id.into()); + self + } + + /// Set the parent span ID to link this span to its parent + pub fn with_parent_span_id(mut self, parent_span_id: impl Into) -> Self { + self.parent_span_id = Some(parent_span_id.into()); + self + } + + /// Set the span kind (defaults to Internal) + pub fn with_kind(mut self, kind: SpanKind) -> Self { + self.kind = kind; + self + } + + /// Set explicit start time (defaults to now) + pub fn with_start_time(mut self, start_time: SystemTime) -> Self { + self.start_time = start_time; + self + } + + /// Set explicit end time (defaults to build time) + pub fn with_end_time(mut self, end_time: SystemTime) -> Self { + self.end_time = Some(end_time); + self + } + + /// Add a single attribute to the span + pub fn with_attribute(mut self, key: impl Into, value: impl Into) -> Self { + self.attributes.insert(key.into(), value.into()); + self + } + + /// Add multiple attributes at once + pub fn with_attributes(mut self, attrs: HashMap) -> Self { + self.attributes.extend(attrs); + self + } + + /// Build the span, consuming the builder + /// + /// Creates a complete OTEL-compliant span with all provided attributes, + /// generating span_id and using provided or random trace_id. + pub fn build(self) -> Span { + let end_time = self.end_time.unwrap_or_else(SystemTime::now); + + let start_nanos = system_time_to_nanos(self.start_time); + let end_nanos = system_time_to_nanos(end_time); + + // Generate trace_id if not provided + let trace_id = self.trace_id.unwrap_or_else(|| generate_random_trace_id()); + + // Create attributes in OTEL format + let attributes: Vec = self.attributes + .into_iter() + .map(|(key, value)| Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }) + .collect(); + + // Build span directly without going through Span::new() + Span { + trace_id, + span_id: generate_random_span_id(), + parent_span_id: self.parent_span_id, + name: self.name, + start_time_unix_nano: format!("{}", start_nanos), + end_time_unix_nano: format!("{}", end_nanos), + kind: self.kind as u32, + attributes, + events: None, + } + } +} + +/// Convert SystemTime to nanoseconds since UNIX epoch for OTEL +fn system_time_to_nanos(time: SystemTime) -> u128 { + time.duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos() +} + +/// Generate a random span ID (16 hex characters = 8 bytes) +fn generate_random_span_id() -> String { + use rand::RngCore; + let mut rng = rand::thread_rng(); + let mut random_bytes = [0u8; 8]; + rng.fill_bytes(&mut random_bytes); + hex::encode(random_bytes) +} + +/// Generate a random trace ID (32 hex characters = 16 bytes) +fn generate_random_trace_id() -> String { + use rand::RngCore; + let mut rng = rand::thread_rng(); + let mut random_bytes = [0u8; 16]; + rng.fill_bytes(&mut random_bytes); + hex::encode(random_bytes) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_span_builder_basic() { + let span = SpanBuilder::new("test_operation") + .with_trace_id("abc123") + .with_parent_span_id("parent123") + .with_attribute("key", "value") + .build(); + + assert_eq!(span.name, "test_operation"); + assert_eq!(span.trace_id, "abc123"); + assert_eq!(span.parent_span_id, Some("parent123".to_string())); + assert_eq!(span.attributes.len(), 1); + } + + #[test] + fn test_span_builder_no_parent() { + let span = SpanBuilder::new("root_span") + .with_trace_id("xyz789") + .build(); + + assert_eq!(span.name, "root_span"); + assert_eq!(span.trace_id, "xyz789"); + assert_eq!(span.parent_span_id, None); + } +} diff --git a/crates/common/src/traces/tests/mock_otel_collector.rs b/crates/common/src/traces/tests/mock_otel_collector.rs new file mode 100644 index 00000000..8a154145 --- /dev/null +++ b/crates/common/src/traces/tests/mock_otel_collector.rs @@ -0,0 +1,101 @@ +//! Mock OTEL Collector for testing trace output +//! +//! This module provides a simple HTTP server that mimics an OTEL collector. +//! It exposes three endpoints: +//! - POST /v1/traces: Capture incoming OTLP JSON payloads +//! - GET /v1/traces: Return all captured payloads as JSON array +//! - DELETE /v1/traces: Clear all captured payloads +//! +//! Each test creates its own MockOtelCollector instance. + +use axum::{ + extract::State, + http::StatusCode, + routing::{delete, get, post}, + Json, Router, +}; +use serde_json::Value; +use std::sync::Arc; +use tokio::sync::RwLock; + +type SharedTraces = Arc>>; + +/// POST /v1/traces - capture incoming OTLP payload +async fn post_traces( + State(traces): State, + Json(payload): Json, +) -> StatusCode { + traces.write().await.push(payload); + StatusCode::OK +} + +/// GET /v1/traces - return all captured payloads +async fn get_traces(State(traces): State) -> Json> { + Json(traces.read().await.clone()) +} + +/// DELETE /v1/traces - clear all captured payloads +async fn delete_traces(State(traces): State) -> StatusCode { + traces.write().await.clear(); + StatusCode::NO_CONTENT +} + +/// Mock OTEL collector server +pub struct MockOtelCollector { + address: String, + client: reqwest::Client, + #[allow(dead_code)] + server_handle: tokio::task::JoinHandle<()>, +} + +impl MockOtelCollector { + /// Create and start a new mock collector on a random port + pub async fn start() -> Self { + let traces = Arc::new(RwLock::new(Vec::new())); + + let app = Router::new() + .route("/v1/traces", post(post_traces)) + .route("/v1/traces", get(get_traces)) + .route("/v1/traces", delete(delete_traces)) + .with_state(traces.clone()); + + let listener = tokio::net::TcpListener::bind("127.0.0.1:0") + .await + .expect("Failed to bind to random port"); + + let addr = listener.local_addr().expect("Failed to get local address"); + let address = format!("http://127.0.0.1:{}", addr.port()); + + let server_handle = tokio::spawn(async move { + axum::serve(listener, app) + .await + .expect("Server failed"); + }); + + // Give server a moment to start + tokio::time::sleep(tokio::time::Duration::from_millis(50)).await; + + Self { + address, + client: reqwest::Client::new(), + server_handle, + } + } + + /// Get the address of the collector + pub fn address(&self) -> &str { + &self.address + } + + /// GET /v1/traces - fetch all captured payloads + pub async fn get_traces(&self) -> Vec { + self.client + .get(format!("{}/v1/traces", self.address)) + .send() + .await + .expect("Failed to GET traces") + .json() + .await + .expect("Failed to parse traces JSON") + } +} diff --git a/crates/common/src/traces/tests/mod.rs b/crates/common/src/traces/tests/mod.rs new file mode 100644 index 00000000..7bba42f8 --- /dev/null +++ b/crates/common/src/traces/tests/mod.rs @@ -0,0 +1,4 @@ +mod mock_otel_collector; +mod trace_integration_test; + +pub use mock_otel_collector::MockOtelCollector; diff --git a/crates/common/src/traces/tests/trace_integration_test.rs b/crates/common/src/traces/tests/trace_integration_test.rs new file mode 100644 index 00000000..a3c8a6ba --- /dev/null +++ b/crates/common/src/traces/tests/trace_integration_test.rs @@ -0,0 +1,304 @@ +//! Integration tests for OpenTelemetry tracing in router.rs +//! +//! These tests validate that the spans created for LLM requests contain +//! all expected attributes and events by checking the raw JSON payloads +//! sent to the mock OTEL collector. +//! +//! ## Test Design +//! Each test creates its own MockOtelCollector and TraceCollector: +//! 1. Start MockOtelCollector on random port +//! 2. Create TraceCollector with 500ms flush interval +//! 3. Record spans using TraceCollector +//! 4. Flush and wait (500ms + 200ms buffer = 700ms total) for spans to arrive +//! 5. Get raw JSON payloads (GET /v1/traces) and validate structure +//! 6. Test cleanup happens automatically when collectors are dropped +//! +//! ## Serial Execution +//! Tests use the `#[serial]` attribute to run sequentially because they +//! use global environment variables (OTEL_COLLECTOR_URL, OTEL_TRACING_ENABLED, +//! TRACE_FLUSH_INTERVAL_MS). This ensures test isolation without requiring +//! the `--test-threads=1` command line flag. + +const FLUSH_INTERVAL_MS: u64 = 50; +const FLUSH_BUFFER_MS: u64 = 50; +const TOTAL_WAIT_MS: u64 = FLUSH_INTERVAL_MS + FLUSH_BUFFER_MS; + +use crate::traces::{SpanBuilder, SpanKind, TraceCollector}; +use serde_json::Value; +use serial_test::serial; +use std::sync::Arc; + +use super::MockOtelCollector; + +/// Helper to extract all spans from OTLP JSON payloads +fn extract_spans(payloads: &[Value]) -> Vec<&Value> { + let mut spans = Vec::new(); + for payload in payloads { + if let Some(resource_spans) = payload.get("resourceSpans").and_then(|v| v.as_array()) { + for resource_span in resource_spans { + if let Some(scope_spans) = resource_span.get("scopeSpans").and_then(|v| v.as_array()) { + for scope_span in scope_spans { + if let Some(span_list) = scope_span.get("spans").and_then(|v| v.as_array()) { + spans.extend(span_list.iter()); + } + } + } + } + } + } + spans +} + +/// Helper to get string attribute value from a span +fn get_string_attr<'a>(span: &'a Value, key: &str) -> Option<&'a str> { + span.get("attributes") + .and_then(|attrs| attrs.as_array()) + .and_then(|attrs| { + attrs.iter().find(|attr| { + attr.get("key").and_then(|k| k.as_str()) == Some(key) + }) + }) + .and_then(|attr| attr.get("value")) + .and_then(|v| v.get("stringValue")) + .and_then(|v| v.as_str()) +} + +#[tokio::test] +#[serial] +async fn test_llm_span_contains_basic_attributes() { + // Start mock OTEL collector + let mock_collector = MockOtelCollector::start().await; + + // Create TraceCollector pointing to mock with 500ms flush intervalc + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + // Create a test span simulating router.rs behavior + let span = SpanBuilder::new("POST /v1/chat/completions >> /v1/chat/completions") + .with_kind(SpanKind::Client) + .with_trace_id("test-trace-123") + .with_attribute("http.method", "POST") + .with_attribute("http.target", "/v1/chat/completions") + .with_attribute("http.upstream_target", "/v1/chat/completions") + .with_attribute("llm.model", "gpt-4o") + .with_attribute("llm.provider", "openai") + .with_attribute("llm.is_streaming", "true") + .with_attribute("llm.temperature", "0.7") + .build(); + + trace_collector.record_span("archgw(llm)", span); + + // Flush and wait for spans to arrive (500ms flush interval + 200ms buffer) + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let spans = extract_spans(&payloads); + + assert_eq!(spans.len(), 1, "Expected exactly one span"); + + let span = spans[0]; + // Validate HTTP attributes + assert_eq!(get_string_attr(span, "http.method"), Some("POST")); + assert_eq!(get_string_attr(span, "http.target"), Some("/v1/chat/completions")); + + // Validate LLM attributes + assert_eq!(get_string_attr(span, "llm.model"), Some("gpt-4o")); + assert_eq!(get_string_attr(span, "llm.provider"), Some("openai")); + assert_eq!(get_string_attr(span, "llm.is_streaming"), Some("true")); + assert_eq!(get_string_attr(span, "llm.temperature"), Some("0.7")); +} + +#[tokio::test] +#[serial] +async fn test_llm_span_contains_tool_information() { + let mock_collector = MockOtelCollector::start().await; + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + let tools_formatted = "get_weather(...)\nsearch_web(...)\ncalculate(...)"; + + let span = SpanBuilder::new("POST /v1/chat/completions") + .with_trace_id("test-trace-tools") + .with_attribute("llm.request.tools", tools_formatted) + .with_attribute("llm.model", "gpt-4o") + .build(); + + trace_collector.record_span("archgw(llm)", span); + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let spans = extract_spans(&payloads); + + assert!(!spans.is_empty(), "No spans captured"); + + let span = spans[0]; + let tools = get_string_attr(span, "llm.request.tools"); + + assert!(tools.is_some(), "Tools attribute missing"); + assert!(tools.unwrap().contains("get_weather(...)")); + assert!(tools.unwrap().contains("search_web(...)")); + assert!(tools.unwrap().contains("calculate(...)")); + assert!(tools.unwrap().contains('\n'), "Tools should be newline-separated"); +} + +#[tokio::test] +#[serial] +async fn test_llm_span_contains_user_message_preview() { + let mock_collector = MockOtelCollector::start().await; + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + let long_message = "This is a very long user message that should be truncated to 50 characters in the span"; + let preview = if long_message.len() > 50 { + format!("{}...", &long_message[..50]) + } else { + long_message.to_string() + }; + + let span = SpanBuilder::new("POST /v1/messages") + .with_trace_id("test-trace-preview") + .with_attribute("llm.request.user_message_preview", &preview) + .build(); + + trace_collector.record_span("archgw(llm)", span); + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let spans = extract_spans(&payloads); + let span = spans[0]; + + let message_preview = get_string_attr(span, "llm.request.user_message_preview"); + + assert!(message_preview.is_some()); + assert!(message_preview.unwrap().len() <= 53); // 50 chars + "..." + assert!(message_preview.unwrap().contains("...")); +} + +#[tokio::test] +#[serial] +async fn test_llm_span_contains_time_to_first_token() { + let mock_collector = MockOtelCollector::start().await; + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + let ttft_ms = "245"; // milliseconds as string + + let span = SpanBuilder::new("POST /v1/chat/completions") + .with_trace_id("test-trace-ttft") + .with_attribute("llm.is_streaming", "true") + .with_attribute("llm.time_to_first_token_ms", ttft_ms) + .build(); + + trace_collector.record_span("archgw(llm)", span); + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let spans = extract_spans(&payloads); + let span = spans[0]; + + // Check TTFT attribute + let ttft_attr = get_string_attr(span, "llm.time_to_first_token_ms"); + assert_eq!(ttft_attr, Some("245")); +} + +#[tokio::test] +#[serial] +async fn test_llm_span_contains_upstream_path() { + let mock_collector = MockOtelCollector::start().await; + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + // Test Zhipu provider with path transformation + let span = SpanBuilder::new("POST /v1/chat/completions >> /api/paas/v4/chat/completions") + .with_trace_id("test-trace-upstream") + .with_attribute("http.upstream_target", "/api/paas/v4/chat/completions") + .with_attribute("llm.provider", "zhipu") + .with_attribute("llm.model", "glm-4") + .build(); + + trace_collector.record_span("archgw(llm)", span); + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let spans = extract_spans(&payloads); + let span = spans[0]; + + // Operation name should show the transformation + let name = span.get("name").and_then(|v| v.as_str()); + assert!(name.is_some()); + assert!(name.unwrap().contains(">>"), "Operation name should show path transformation"); + + // Check upstream target attribute + let upstream = get_string_attr(span, "http.upstream_target"); + assert_eq!(upstream, Some("/api/paas/v4/chat/completions")); +} + +#[tokio::test] +#[serial] +async fn test_llm_span_multiple_services() { + let mock_collector = MockOtelCollector::start().await; + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "true"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(true))); + + // Create spans for different services + let llm_span = SpanBuilder::new("LLM Request") + .with_trace_id("test-multi") + .with_attribute("service", "llm") + .build(); + + let routing_span = SpanBuilder::new("Routing Decision") + .with_trace_id("test-multi") + .with_attribute("service", "routing") + .build(); + + trace_collector.record_span("archgw(llm)", llm_span); + trace_collector.record_span("archgw(routing)", routing_span); + trace_collector.flush().await.expect("Failed to flush"); + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let all_spans = extract_spans(&payloads); + + assert_eq!(all_spans.len(), 2, "Should have captured both spans"); +} + +#[tokio::test] +#[serial] +async fn test_tracing_disabled_produces_no_spans() { + let mock_collector = MockOtelCollector::start().await; + + // Create TraceCollector with tracing DISABLED + std::env::set_var("OTEL_COLLECTOR_URL", format!("{}/v1/traces", mock_collector.address())); + std::env::set_var("OTEL_TRACING_ENABLED", "false"); + std::env::set_var("TRACE_FLUSH_INTERVAL_MS", "500"); + let trace_collector = Arc::new(TraceCollector::new(Some(false))); + + let span = SpanBuilder::new("Test Span") + .with_trace_id("test-disabled") + .build(); + + trace_collector.record_span("archgw(llm)", span); + trace_collector.flush().await.ok(); // Should be no-op when disabled + tokio::time::sleep(tokio::time::Duration::from_millis(TOTAL_WAIT_MS)).await; + + let payloads = mock_collector.get_traces().await; + let all_spans = extract_spans(&payloads); + assert_eq!(all_spans.len(), 0, "No spans should be captured when tracing is disabled"); +} diff --git a/crates/hermesllm/src/apis/amazon_bedrock.rs b/crates/hermesllm/src/apis/amazon_bedrock.rs index 252bd0f1..7b4a511f 100644 --- a/crates/hermesllm/src/apis/amazon_bedrock.rs +++ b/crates/hermesllm/src/apis/amazon_bedrock.rs @@ -200,6 +200,17 @@ impl ProviderRequest for ConverseRequest { }) } + fn get_tool_names(&self) -> Option> { + self.tool_config.as_ref()?.tools.as_ref().map(|tools| { + tools + .iter() + .filter_map(|tool| match tool { + Tool::ToolSpec { tool_spec } => Some(tool_spec.name.clone()), + }) + .collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize Bedrock request: {}", e), @@ -218,6 +229,10 @@ impl ProviderRequest for ConverseRequest { false } } + + fn get_temperature(&self) -> Option { + self.inference_config.as_ref()?.temperature + } } // ============================================================================ diff --git a/crates/hermesllm/src/apis/anthropic.rs b/crates/hermesllm/src/apis/anthropic.rs index 7e1951e4..06d632d9 100644 --- a/crates/hermesllm/src/apis/anthropic.rs +++ b/crates/hermesllm/src/apis/anthropic.rs @@ -513,6 +513,12 @@ impl ProviderRequest for MessagesRequest { None } + fn get_tool_names(&self) -> Option> { + self.tools.as_ref().map(|tools| { + tools.iter().map(|tool| tool.name.clone()).collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize MessagesRequest: {}", e), @@ -531,6 +537,10 @@ impl ProviderRequest for MessagesRequest { false } } + + fn get_temperature(&self) -> Option { + self.temperature + } } impl MessagesResponse { diff --git a/crates/hermesllm/src/apis/openai.rs b/crates/hermesllm/src/apis/openai.rs index d7f7a07d..4e006c3a 100644 --- a/crates/hermesllm/src/apis/openai.rs +++ b/crates/hermesllm/src/apis/openai.rs @@ -687,6 +687,32 @@ impl ProviderRequest for ChatCompletionsRequest { }) } + fn get_tool_names(&self) -> Option> { + // First check the 'tools' field (current API) + if let Some(tools) = &self.tools { + let names: Vec = tools + .iter() + .map(|tool| tool.function.name.clone()) + .collect(); + if !names.is_empty() { + return Some(names); + } + } + + // Fallback to 'functions' field (deprecated but still supported) + if let Some(functions) = &self.functions { + let names: Vec = functions + .iter() + .map(|func| func.function.name.clone()) + .collect(); + if !names.is_empty() { + return Some(names); + } + } + + None + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(&self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize OpenAI request: {}", e), @@ -705,6 +731,10 @@ impl ProviderRequest for ChatCompletionsRequest { false } } + + fn get_temperature(&self) -> Option { + self.temperature + } } /// Implementation of ProviderResponse for ChatCompletionsResponse diff --git a/crates/hermesllm/src/apis/openai_responses.rs b/crates/hermesllm/src/apis/openai_responses.rs index 4f0cf663..91c4b0cc 100644 --- a/crates/hermesllm/src/apis/openai_responses.rs +++ b/crates/hermesllm/src/apis/openai_responses.rs @@ -1063,6 +1063,19 @@ impl ProviderRequest for ResponsesAPIRequest { } } + fn get_tool_names(&self) -> Option> { + self.tools.as_ref().map(|tools| { + tools + .iter() + .filter_map(|tool| match tool { + Tool::Function { name, .. } => Some(name.clone()), + // Other tool types don't have user-defined names + _ => None, + }) + .collect() + }) + } + fn to_bytes(&self) -> Result, ProviderRequestError> { serde_json::to_vec(&self).map_err(|e| ProviderRequestError { message: format!("Failed to serialize Responses API request: {}", e), @@ -1081,6 +1094,10 @@ impl ProviderRequest for ResponsesAPIRequest { false } } + + fn get_temperature(&self) -> Option { + self.temperature + } } // ============================================================================ diff --git a/crates/hermesllm/src/providers/request.rs b/crates/hermesllm/src/providers/request.rs index daeebe70..eb8f0788 100644 --- a/crates/hermesllm/src/providers/request.rs +++ b/crates/hermesllm/src/providers/request.rs @@ -35,6 +35,9 @@ pub trait ProviderRequest: Send + Sync { /// Extract the user message for tracing/logging purposes fn get_recent_user_message(&self) -> Option; + /// Get tool names if tools are defined in the request + fn get_tool_names(&self) -> Option>; + /// Convert the request to bytes for transmission fn to_bytes(&self) -> Result, ProviderRequestError>; @@ -42,6 +45,8 @@ pub trait ProviderRequest: Send + Sync { /// Remove a metadata key from the request and return true if the key was present fn remove_metadata_key(&mut self, key: &str) -> bool; + + fn get_temperature(&self) -> Option; } impl ProviderRequest for ProviderRequestType { @@ -95,6 +100,16 @@ impl ProviderRequest for ProviderRequestType { } } + fn get_tool_names(&self) -> Option> { + match self { + Self::ChatCompletionsRequest(r) => r.get_tool_names(), + Self::MessagesRequest(r) => r.get_tool_names(), + Self::BedrockConverse(r) => r.get_tool_names(), + Self::BedrockConverseStream(r) => r.get_tool_names(), + Self::ResponsesAPIRequest(r) => r.get_tool_names(), + } + } + fn to_bytes(&self) -> Result, ProviderRequestError> { match self { Self::ChatCompletionsRequest(r) => r.to_bytes(), @@ -124,6 +139,16 @@ impl ProviderRequest for ProviderRequestType { Self::ResponsesAPIRequest(r) => r.remove_metadata_key(key), } } + + fn get_temperature(&self) -> Option { + match self { + Self::ChatCompletionsRequest(r) => r.get_temperature(), + Self::MessagesRequest(r) => r.get_temperature(), + Self::BedrockConverse(r) => r.get_temperature(), + Self::BedrockConverseStream(r) => r.get_temperature(), + Self::ResponsesAPIRequest(r) => r.get_temperature(), + } + } } /// Parse the client API from a byte slice. diff --git a/crates/llm_gateway/src/filter_context.rs b/crates/llm_gateway/src/filter_context.rs index 2b8e1a95..3a1f7b84 100644 --- a/crates/llm_gateway/src/filter_context.rs +++ b/crates/llm_gateway/src/filter_context.rs @@ -2,26 +2,18 @@ use crate::metrics::Metrics; use crate::stream_context::StreamContext; use common::configuration::Configuration; use common::configuration::Overrides; -use common::consts::OTEL_COLLECTOR_HTTP; -use common::consts::OTEL_POST_PATH; -use common::http::CallArgs; use common::http::Client; use common::llm_providers::LlmProviders; use common::ratelimit; use common::stats::Gauge; -use common::tracing::TraceData; use log::trace; -use log::warn; use proxy_wasm::traits::*; use proxy_wasm::types::*; use std::cell::RefCell; use std::collections::HashMap; -use std::collections::VecDeque; use std::rc::Rc; use std::time::Duration; -use std::sync::{Arc, Mutex}; - #[derive(Debug)] pub struct CallContext {} @@ -31,7 +23,6 @@ pub struct FilterContext { // callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request. callouts: RefCell>, llm_providers: Option>, - traces_queue: Arc>>, overrides: Rc>, } @@ -41,7 +32,6 @@ impl FilterContext { callouts: RefCell::new(HashMap::new()), metrics: Rc::new(Metrics::new()), llm_providers: None, - traces_queue: Arc::new(Mutex::new(VecDeque::new())), overrides: Rc::new(None), } } @@ -95,7 +85,6 @@ impl RootContext for FilterContext { .as_ref() .expect("LLM Providers must exist when Streams are being created"), ), - Arc::clone(&self.traces_queue), Rc::clone(&self.overrides), ))) } @@ -108,34 +97,6 @@ impl RootContext for FilterContext { self.set_tick_period(Duration::from_secs(1)); true } - - fn on_tick(&mut self) { - let _ = self.traces_queue.try_lock().map(|mut traces_queue| { - while let Some(trace) = traces_queue.pop_front() { - let trace_str = serde_json::to_string(&trace).unwrap(); - trace!("trace details: {}", trace_str); - let call_args = CallArgs::new( - OTEL_COLLECTOR_HTTP, - OTEL_POST_PATH, - vec![ - (":method", http::Method::POST.as_str()), - (":path", OTEL_POST_PATH), - (":authority", OTEL_COLLECTOR_HTTP), - ("content-type", "application/json"), - ], - Some(trace_str.as_bytes()), - vec![], - Duration::from_secs(60), - ); - if let Err(error) = self.http_call(call_args, CallContext {}) { - warn!( - "failed to schedule http call to otel-collector: {:?}", - error - ); - } - } - }); - } } impl Context for FilterContext { diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 42d7cb31..fbbf6c28 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -4,10 +4,8 @@ use log::{debug, info, warn}; use proxy_wasm::hostcalls::get_current_time; use proxy_wasm::traits::*; use proxy_wasm::types::*; -use std::collections::VecDeque; use std::num::NonZero; use std::rc::Rc; -use std::sync::{Arc, Mutex}; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use crate::metrics::Metrics; @@ -20,7 +18,6 @@ use common::errors::ServerError; use common::llm_providers::LlmProviders; use common::ratelimit::Header; use common::stats::{IncrementingMetric, RecordingMetric}; -use common::tracing::{Event, Span, TraceData, Traceparent}; use common::{ratelimit, routing, tokenizer}; use hermesllm::apis::streaming_shapes::amazon_bedrock_binary_frame::BedrockBinaryFrameDecoder; use hermesllm::apis::streaming_shapes::sse::{ @@ -51,7 +48,6 @@ pub struct StreamContext { ttft_time: Option, traceparent: Option, request_body_sent_time: Option, - traces_queue: Arc>>, overrides: Rc>, user_message: Option, upstream_status_code: Option, @@ -65,7 +61,6 @@ impl StreamContext { pub fn new( metrics: Rc, llm_providers: Rc, - traces_queue: Arc>>, overrides: Rc>, ) -> Self { StreamContext { @@ -83,7 +78,6 @@ impl StreamContext { ttft_duration: None, traceparent: None, ttft_time: None, - traces_queue, request_body_sent_time: None, user_message: None, upstream_status_code: None, @@ -333,68 +327,6 @@ impl StreamContext { self.metrics .output_sequence_length .record(self.response_tokens as u64); - - if let Some(traceparent) = self.traceparent.as_ref() { - let current_time_ns = current_time_ns(); - - match Traceparent::try_from(traceparent.to_string()) { - Err(e) => { - warn!("traceparent header is invalid: {}", e); - } - Ok(traceparent) => { - let service_name = match &self.resolved_api { - Some(api) => { - let api_display = api.to_string(); - format!("archgw.{}", api_display) - } - None => "archgw".to_string(), - }; - - let mut trace_data = - common::tracing::TraceData::new_with_service_name(service_name); - let mut llm_span = Span::new( - self.llm_provider().name.to_string(), - Some(traceparent.trace_id), - Some(traceparent.parent_id), - self.request_body_sent_time.unwrap(), - current_time_ns, - ); - llm_span - .add_attribute("model".to_string(), self.llm_provider().name.to_string()); - - if let Some(user_message) = &self.user_message { - llm_span.add_attribute("message".to_string(), user_message.clone()); - } - - // Add HTTP attributes - if let Some(method) = &self.http_method { - llm_span.add_attribute("http.method".to_string(), method.clone()); - } - if let Some(protocol) = &self.http_protocol { - llm_span.add_attribute("http.protocol".to_string(), protocol.clone()); - } - if let Some(status_code) = &self.upstream_status_code { - llm_span.add_attribute( - "http.status_code".to_string(), - status_code.as_u16().to_string(), - ); - } - - // Add request ID attribute - llm_span - .add_attribute("http.request_id".to_string(), self.request_identifier()); - - if self.ttft_time.is_some() { - llm_span.add_event(Event::new( - "time_to_first_token".to_string(), - self.ttft_time.unwrap(), - )); - } - trace_data.add_span(llm_span); - self.traces_queue.lock().unwrap().push_back(trace_data); - } - }; - } } fn read_raw_response_body(&mut self, body_size: usize) -> Result, Action> { diff --git a/demos/samples_python/currency_exchange/arch_config.yaml b/demos/samples_python/currency_exchange/arch_config.yaml index 1c399449..71c06d18 100644 --- a/demos/samples_python/currency_exchange/arch_config.yaml +++ b/demos/samples_python/currency_exchange/arch_config.yaml @@ -8,8 +8,15 @@ listeners: timeout: 30s llm_providers: - - access_key: $OPENAI_API_KEY - model: openai/gpt-4o + - model: openai/gpt-4o-mini + access_key: $OPENAI_API_KEY + default: true + + - model: openai/gpt-4o + access_key: $OPENAI_API_KEY + routing_preferences: + - name: code understanding + description: understand and explain existing code snippets, functions, or libraries endpoints: frankfurther_api: diff --git a/demos/use_cases/model_choice_with_test_harness/pyproject.toml b/demos/use_cases/model_choice_with_test_harness/pyproject.toml index b4a2c1c4..4147942e 100644 --- a/demos/use_cases/model_choice_with_test_harness/pyproject.toml +++ b/demos/use_cases/model_choice_with_test_harness/pyproject.toml @@ -12,7 +12,7 @@ python = ">=3.10,<3.13.3" pydantic = "^2.0" openai = "^1.0" pyyaml = "^6.0" -archgw ="^0.3.21" +archgw ="^0.3.22" [tool.poetry.group.dev.dependencies] pytest = "^8.3" diff --git a/demos/use_cases/preference_based_routing/README.md b/demos/use_cases/preference_based_routing/README.md index 3095edca..139d6252 100644 --- a/demos/use_cases/preference_based_routing/README.md +++ b/demos/use_cases/preference_based_routing/README.md @@ -14,9 +14,9 @@ Make sure your machine is up to date with [latest version of archgw]([url](https 2. start archgw in the foreground ```bash (venv) $ archgw up --service archgw --foreground -2025-05-30 18:00:09,953 - cli.main - INFO - Starting archgw cli version: 0.3.21 +2025-05-30 18:00:09,953 - cli.main - INFO - Starting archgw cli version: 0.3.22 2025-05-30 18:00:09,953 - cli.main - INFO - Validating /Users/adilhafeez/src/intelligent-prompt-gateway/demos/use_cases/preference_based_routing/arch_config.yaml -2025-05-30 18:00:10,422 - cli.core - INFO - Starting arch gateway, image name: archgw, tag: katanemo/archgw:0.3.21 +2025-05-30 18:00:10,422 - cli.core - INFO - Starting arch gateway, image name: archgw, tag: katanemo/archgw:0.3.22 2025-05-30 18:00:10,662 - cli.core - INFO - archgw status: running, health status: starting 2025-05-30 18:00:11,712 - cli.core - INFO - archgw status: running, health status: starting 2025-05-30 18:00:12,761 - cli.core - INFO - archgw is running and is healthy! diff --git a/docs/source/conf.py b/docs/source/conf.py index ab39d03f..11048b61 100644 --- a/docs/source/conf.py +++ b/docs/source/conf.py @@ -15,7 +15,7 @@ from sphinxawesome_theme.postprocess import Icons project = "Arch Docs" copyright = "2025, Katanemo Labs, Inc" author = "Katanemo Labs, Inc" -release = " v0.3.21" +release = " v0.3.22" # -- General configuration --------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#general-configuration diff --git a/docs/source/get_started/quickstart.rst b/docs/source/get_started/quickstart.rst index 4ca25450..546285c2 100644 --- a/docs/source/get_started/quickstart.rst +++ b/docs/source/get_started/quickstart.rst @@ -25,7 +25,7 @@ Arch's CLI allows you to manage and interact with the Arch gateway efficiently. $ python -m venv venv $ source venv/bin/activate # On Windows, use: venv\Scripts\activate - $ pip install archgw==0.3.21 + $ pip install archgw==0.3.22 Build AI Agent with Arch Gateway diff --git a/docs/source/resources/deployment.rst b/docs/source/resources/deployment.rst index e1489a9e..c23e0adc 100644 --- a/docs/source/resources/deployment.rst +++ b/docs/source/resources/deployment.rst @@ -25,7 +25,7 @@ Create a ``docker-compose.yml`` file with the following configuration: # docker-compose.yml services: archgw: - image: katanemo/archgw:0.3.21 + image: katanemo/archgw:0.3.22 container_name: archgw ports: - "10000:10000" # ingress (client -> arch)