diff --git a/arch/Dockerfile b/arch/Dockerfile index f485f938..74cfd40a 100644 --- a/arch/Dockerfile +++ b/arch/Dockerfile @@ -25,5 +25,4 @@ COPY arch/tools/cli/config_generator.py . COPY arch/envoy.template.yaml . COPY arch/arch_config_schema.yaml . - -ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug"] +ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug 2>&1 | tee /var/log/envoy.log"] diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index cce4e1b1..f08a2b2f 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -120,7 +120,7 @@ static_resources: "@type": type.googleapis.com/envoy.extensions.compression.gzip.compressor.v3.Gzip memory_level: 3 window_bits: 10 - - name: envoy.filters.http.wasm + - name: envoy.filters.http.wasm_prompt typed_config: "@type": type.googleapis.com/udpa.type.v1.TypedStruct type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm @@ -137,7 +137,7 @@ static_resources: code: local: filename: "/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm" - - name: envoy.filters.http.wasm + - name: envoy.filters.http.wasm_llm typed_config: "@type": type.googleapis.com/udpa.type.v1.TypedStruct type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm @@ -498,4 +498,22 @@ static_resources: socket_address: address: host.docker.internal port_value: 4317 + - name: opentelemetry_collector_http + type: STRICT_DNS + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + load_assignment: + cluster_name: opentelemetry_collector_http + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: host.docker.internal + port_value: 4318 {% endif %} diff --git a/crates/Cargo.lock b/crates/Cargo.lock index a9c06a49..98157733 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -224,6 +224,7 @@ dependencies = [ "derivative", "duration-string", "governor", + "hex", "log", "pretty_assertions", "proxy-wasm", @@ -764,6 +765,12 @@ dependencies = [ "libc", ] +[[package]] +name = "hex" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" + [[package]] name = "http" version = "1.1.0" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index 4651c610..84aa636c 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -15,6 +15,7 @@ thiserror = "1.0.64" tiktoken-rs = "0.5.9" rand = "0.8.5" serde_json = "1.0" +hex = "0.4.3" [dev-dependencies] pretty_assertions = "1.4.1" diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index 0984bf89..91b52ef3 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -7,8 +7,9 @@ pub mod embeddings; pub mod errors; pub mod http; pub mod llm_providers; +pub mod pii; pub mod ratelimit; pub mod routing; pub mod stats; pub mod tokenizer; -pub mod pii; +pub mod tracing; diff --git a/crates/common/src/tracing.rs b/crates/common/src/tracing.rs new file mode 100644 index 00000000..0d2bb978 --- /dev/null +++ b/crates/common/src/tracing.rs @@ -0,0 +1,177 @@ +use rand::RngCore; +use serde::{Deserialize, Serialize}; + +#[derive(Serialize, Deserialize, Debug)] +pub struct ResourceSpan { + pub resource: Resource, + #[serde(rename = "scopeSpans")] + pub scope_spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Resource { + pub attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct ScopeSpan { + scope: Scope, + spans: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +struct Scope { + name: String, + version: String, + attributes: Vec, +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Span { + #[serde(rename = "traceId")] + pub trace_id: String, + #[serde(rename = "spanId")] + pub span_id: String, + #[serde(rename = "parentSpanId")] + pub parent_span_id: Option, // Optional in case there’s no parent span + pub name: String, + #[serde(rename = "startTimeUnixNano")] + pub start_time_unix_nano: String, + #[serde(rename = "endTimeUnixNano")] + pub end_time_unix_nano: String, + pub kind: u32, + pub attributes: Vec, + pub events: Option>, +} + +impl Span { + pub fn new( + name: String, + parent_trace_id: String, + parent_span_id: Option, + start_time_unix_nano: u128, + end_time_unix_nano: u128, + ) -> Self { + Span { + trace_id: parent_trace_id, + span_id: get_random_span_id(), + parent_span_id, + name, + start_time_unix_nano: format!("{}", start_time_unix_nano), + end_time_unix_nano: format!("{}", end_time_unix_nano), + kind: 0, + attributes: Vec::new(), + events: None, + } + } + + pub fn add_attribute(&mut self, key: String, value: String) { + self.attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } + + pub fn add_event(&mut self, event: Event) { + if self.events.is_none() { + self.events = Some(Vec::new()); + } + self.events.as_mut().unwrap().push(event); + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Event { + #[serde(rename = "timeUnixNano")] + pub time_unix_nano: String, + pub name: String, + pub attributes: Vec, +} + +impl Event { + pub fn new(name: String, time_unix_nano: u128) -> Self { + Event { + time_unix_nano: format!("{}", time_unix_nano), + name, + attributes: Vec::new(), + } + } + + pub fn add_attribute(&mut self, key: String, value: String) { + self.attributes.push(Attribute { + key, + value: AttributeValue { + string_value: Some(value), + }, + }); + } +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct Attribute { + key: String, + value: AttributeValue, +} + +#[derive(Serialize, Deserialize, Debug)] +struct AttributeValue { + #[serde(rename = "stringValue")] + string_value: Option, // Use Option to handle different value types +} + +#[derive(Serialize, Deserialize, Debug)] +pub struct TraceData { + #[serde(rename = "resourceSpans")] + resource_spans: Vec, +} + +impl Default for TraceData { + fn default() -> Self { + Self::new() + } +} + +impl TraceData { + pub fn new() -> Self { + TraceData { + resource_spans: Vec::new(), + } + } + + pub fn add_span(&mut self, span: Span) { + if self.resource_spans.is_empty() { + let resource = Resource { + attributes: vec![Attribute { + key: "service.name".to_string(), + value: AttributeValue { + string_value: Some("upstream-llm".to_string()), + }, + }], + }; + let scope_span = ScopeSpan { + scope: Scope { + name: "default".to_string(), + version: "1.0".to_string(), + attributes: Vec::new(), + }, + spans: Vec::new(), + }; + let resource_span = ResourceSpan { + resource, + scope_spans: vec![scope_span], + }; + self.resource_spans.push(resource_span); + } + self.resource_spans[0].scope_spans[0].spans.push(span); + } +} + +pub fn get_random_span_id() -> String { + let mut rng = rand::thread_rng(); + let mut random_bytes = [0u8; 8]; + rng.fill_bytes(&mut random_bytes); + + hex::encode(random_bytes) +} diff --git a/crates/prompt_gateway/src/http_context.rs b/crates/prompt_gateway/src/http_context.rs index f14114e4..3174a597 100644 --- a/crates/prompt_gateway/src/http_context.rs +++ b/crates/prompt_gateway/src/http_context.rs @@ -1,4 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{ + collections::HashMap, + time::{Duration, SystemTime, UNIX_EPOCH}, +}; use common::{ common_types::{ @@ -13,7 +16,9 @@ use common::{ HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, TRACE_PARENT_HEADER, USER_ROLE, }, errors::ServerError, - http::{CallArgs, Client}, pii::obfuscate_auth_header, + http::{CallArgs, Client}, + pii::obfuscate_auth_header, + tracing::{Event, Span}, }; use http::StatusCode; use log::{debug, trace, warn}; @@ -239,7 +244,7 @@ impl HttpContext for StreamContext { fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { trace!( - "recv [S={}] bytes={} end_stream={}", + "on_http_response_body: recv [S={}] bytes={} end_stream={}", self.context_id, body_size, end_of_stream @@ -250,6 +255,55 @@ impl HttpContext for StreamContext { return Action::Continue; } + if self.time_to_first_token.is_none() { + self.time_to_first_token = Some( + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(), + ); + } + + if end_of_stream && body_size == 0 { + if let Some(traceparent) = self.traceparent.as_ref() { + let since_the_epoch_ns = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + + let traceparent_tokens = traceparent.split("-").collect::>(); + if traceparent_tokens.len() != 4 { + warn!("traceparent header is invalid: {}", traceparent); + return Action::Continue; + } + let parent_trace_id = traceparent_tokens[1]; + let parent_span_id = traceparent_tokens[2]; + let mut trace_data = common::tracing::TraceData::new(); + let mut llm_span = Span::new( + "upstream_llm_time".to_string(), + parent_trace_id.to_string(), + Some(parent_span_id.to_string()), + self.start_upstream_llm_request_time, + since_the_epoch_ns, + ); + if let Some(prompt) = self.user_prompt.as_ref() { + if let Some(content) = prompt.content.as_ref() { + llm_span.add_attribute("user_prompt".to_string(), content.to_string()); + } + } + llm_span.add_event(Event::new( + "time_to_first_token".to_string(), + self.time_to_first_token.unwrap(), + )); + trace_data.add_span(llm_span); + + let trace_data_str = serde_json::to_string(&trace_data).unwrap(); + debug!("upstream_llm trace details: {}", trace_data_str); + // send trace_data to http tracing endpoint + } + return Action::Continue; + } + let body = if self.streaming_response { let streaming_chunk = match self.get_http_response_body(0, body_size) { Some(chunk) => chunk, diff --git a/crates/prompt_gateway/src/stream_context.rs b/crates/prompt_gateway/src/stream_context.rs index b3a60875..79dd99a7 100644 --- a/crates/prompt_gateway/src/stream_context.rs +++ b/crates/prompt_gateway/src/stream_context.rs @@ -33,7 +33,7 @@ use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::str::FromStr; -use std::time::Duration; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; #[derive(Debug, Clone)] pub enum ResponseHandlerType { @@ -77,6 +77,8 @@ pub struct StreamContext { pub chat_completions_request: Option, pub prompt_guards: Rc, pub request_id: Option, + pub start_upstream_llm_request_time: u128, + pub time_to_first_token: Option, pub traceparent: Option, pub tracing: Rc>, } @@ -113,6 +115,8 @@ impl StreamContext { request_id: None, traceparent: None, tracing, + start_upstream_llm_request_time: 0, + time_to_first_token: None, } } @@ -1003,6 +1007,11 @@ impl StreamContext { }; debug!("archgw => llm request: {}", llm_request_str); + self.start_upstream_llm_request_time = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_nanos(); + self.set_http_request_body(0, self.request_body_size, &llm_request_str.into_bytes()); self.resume_http_request(); } diff --git a/demos/shared/trace_streamer/Dockerfile b/demos/shared/trace_streamer/Dockerfile new file mode 100644 index 00000000..189c650a --- /dev/null +++ b/demos/shared/trace_streamer/Dockerfile @@ -0,0 +1,11 @@ +FROM python:3.12-slim as arch + +WORKDIR /app + +RUN pip install requests +COPY stream_traces.py . + +RUN mkdir -p /var/log +RUN touch /var/log/envoy.log + +CMD ["python", "stream_traces.py"] diff --git a/demos/shared/trace_streamer/stream_traces.py b/demos/shared/trace_streamer/stream_traces.py new file mode 100644 index 00000000..4f1bf20c --- /dev/null +++ b/demos/shared/trace_streamer/stream_traces.py @@ -0,0 +1,42 @@ +import os +import time +import requests +import logging + +logging.basicConfig( + level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s" +) + + +otel_tracing_endpoint = os.getenv( + "OTEL_TRACING_HTTP_ENDPOINT", "http://localhost:4318/v1/traces" +) +envoy_log_path = os.getenv("ENVOY_LOG_PATH", "/var/log/envoy.log") + +logging.info(f"Using otel-tracing host: {otel_tracing_endpoint}") +logging.info(f"Using envoy log path: {envoy_log_path}") + + +def process_log_line(line): + try: + response = requests.post( + url=otel_tracing_endpoint, + data=line, + headers={"Content-Type": "application/json"}, + ) + logging.info(f"Sent trace to otel-tracing: {response.status_code}") + except Exception as e: + logging.error(f"Failed to send trace to otel-tracing: {e}") + + +with open(envoy_log_path, "r") as f: + # Seek to the end of the file so we only read new lines + f.seek(0, os.SEEK_END) + while True: + line = f.readline() + if not line: + time.sleep(1) + continue + tokens = line.split("prompt_gateway: upstream_llm trace details: ") + if len(tokens) > 1: + process_log_line(tokens[1]) diff --git a/demos/weather_forecast/docker-compose.yaml b/demos/weather_forecast/docker-compose.yaml index 67a6bf1d..d347c29a 100644 --- a/demos/weather_forecast/docker-compose.yaml +++ b/demos/weather_forecast/docker-compose.yaml @@ -28,3 +28,12 @@ services: ports: - "16686:16686" - "4317:4317" + - "4318:4318" + + trace_streamer: + build: + context: ../shared/trace_streamer + environment: + - OTEL_TRACING_HTTP_ENDPOINT=http://jaeger:4318/v1/traces + volumes: + - ~/archgw_logs:/var/log/ diff --git a/tracing.rest b/tracing.rest new file mode 100644 index 00000000..e277dedd --- /dev/null +++ b/tracing.rest @@ -0,0 +1,31 @@ +POST http://localhost:4318/v1/traces +Content-Type: application/json + +{ + "resourceSpans": [ + { + "resource": { + "attributes": [ + { "key": "service.name", "value": { "stringValue": "upstream-llm" } } + ] + }, + "scopeSpans": [ + { + "scope": { "name": "default", "version": "1.0", "attributes": [] }, + "spans": [ + { + "traceId": "fa8f7c410c28092faafbd7d4a2f5e742", + "spanId": "4dc43055a07410d6", + "parentSpanId": "f0acd74216a5e179", + "name": "archgw", + "startTimeUnixNano": "1731363782228270000", + "endTimeUnixNano": "1731363787843156000", + "kind": 1, + "attributes": [] + } + ] + } + ] + } + ] +}