Add service to stream custom otel traces to otel-collector (#262)

This commit is contained in:
Adil Hafeez 2024-11-12 11:09:40 -08:00 committed by GitHub
parent d87105882b
commit 30647fd508
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 368 additions and 9 deletions

View file

@ -25,5 +25,4 @@ COPY arch/tools/cli/config_generator.py .
COPY arch/envoy.template.yaml .
COPY arch/arch_config_schema.yaml .
ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug"]
ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug 2>&1 | tee /var/log/envoy.log"]

View file

@ -120,7 +120,7 @@ static_resources:
"@type": type.googleapis.com/envoy.extensions.compression.gzip.compressor.v3.Gzip
memory_level: 3
window_bits: 10
- name: envoy.filters.http.wasm
- name: envoy.filters.http.wasm_prompt
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
@ -137,7 +137,7 @@ static_resources:
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm"
- name: envoy.filters.http.wasm
- name: envoy.filters.http.wasm_llm
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
@ -498,4 +498,22 @@ static_resources:
socket_address:
address: host.docker.internal
port_value: 4317
- name: opentelemetry_collector_http
type: STRICT_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: opentelemetry_collector_http
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 4318
{% endif %}

7
crates/Cargo.lock generated
View file

@ -224,6 +224,7 @@ dependencies = [
"derivative",
"duration-string",
"governor",
"hex",
"log",
"pretty_assertions",
"proxy-wasm",
@ -764,6 +765,12 @@ dependencies = [
"libc",
]
[[package]]
name = "hex"
version = "0.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70"
[[package]]
name = "http"
version = "1.1.0"

View file

@ -15,6 +15,7 @@ thiserror = "1.0.64"
tiktoken-rs = "0.5.9"
rand = "0.8.5"
serde_json = "1.0"
hex = "0.4.3"
[dev-dependencies]
pretty_assertions = "1.4.1"

View file

@ -7,8 +7,9 @@ pub mod embeddings;
pub mod errors;
pub mod http;
pub mod llm_providers;
pub mod pii;
pub mod ratelimit;
pub mod routing;
pub mod stats;
pub mod tokenizer;
pub mod pii;
pub mod tracing;

View file

@ -0,0 +1,177 @@
use rand::RngCore;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug)]
pub struct ResourceSpan {
pub resource: Resource,
#[serde(rename = "scopeSpans")]
pub scope_spans: Vec<ScopeSpan>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Resource {
pub attributes: Vec<Attribute>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ScopeSpan {
scope: Scope,
spans: Vec<Span>,
}
#[derive(Serialize, Deserialize, Debug)]
struct Scope {
name: String,
version: String,
attributes: Vec<Attribute>,
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Span {
#[serde(rename = "traceId")]
pub trace_id: String,
#[serde(rename = "spanId")]
pub span_id: String,
#[serde(rename = "parentSpanId")]
pub parent_span_id: Option<String>, // Optional in case theres no parent span
pub name: String,
#[serde(rename = "startTimeUnixNano")]
pub start_time_unix_nano: String,
#[serde(rename = "endTimeUnixNano")]
pub end_time_unix_nano: String,
pub kind: u32,
pub attributes: Vec<Attribute>,
pub events: Option<Vec<Event>>,
}
impl Span {
pub fn new(
name: String,
parent_trace_id: String,
parent_span_id: Option<String>,
start_time_unix_nano: u128,
end_time_unix_nano: u128,
) -> Self {
Span {
trace_id: parent_trace_id,
span_id: get_random_span_id(),
parent_span_id,
name,
start_time_unix_nano: format!("{}", start_time_unix_nano),
end_time_unix_nano: format!("{}", end_time_unix_nano),
kind: 0,
attributes: Vec::new(),
events: None,
}
}
pub fn add_attribute(&mut self, key: String, value: String) {
self.attributes.push(Attribute {
key,
value: AttributeValue {
string_value: Some(value),
},
});
}
pub fn add_event(&mut self, event: Event) {
if self.events.is_none() {
self.events = Some(Vec::new());
}
self.events.as_mut().unwrap().push(event);
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Event {
#[serde(rename = "timeUnixNano")]
pub time_unix_nano: String,
pub name: String,
pub attributes: Vec<Attribute>,
}
impl Event {
pub fn new(name: String, time_unix_nano: u128) -> Self {
Event {
time_unix_nano: format!("{}", time_unix_nano),
name,
attributes: Vec::new(),
}
}
pub fn add_attribute(&mut self, key: String, value: String) {
self.attributes.push(Attribute {
key,
value: AttributeValue {
string_value: Some(value),
},
});
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct Attribute {
key: String,
value: AttributeValue,
}
#[derive(Serialize, Deserialize, Debug)]
struct AttributeValue {
#[serde(rename = "stringValue")]
string_value: Option<String>, // Use Option to handle different value types
}
#[derive(Serialize, Deserialize, Debug)]
pub struct TraceData {
#[serde(rename = "resourceSpans")]
resource_spans: Vec<ResourceSpan>,
}
impl Default for TraceData {
fn default() -> Self {
Self::new()
}
}
impl TraceData {
pub fn new() -> Self {
TraceData {
resource_spans: Vec::new(),
}
}
pub fn add_span(&mut self, span: Span) {
if self.resource_spans.is_empty() {
let resource = Resource {
attributes: vec![Attribute {
key: "service.name".to_string(),
value: AttributeValue {
string_value: Some("upstream-llm".to_string()),
},
}],
};
let scope_span = ScopeSpan {
scope: Scope {
name: "default".to_string(),
version: "1.0".to_string(),
attributes: Vec::new(),
},
spans: Vec::new(),
};
let resource_span = ResourceSpan {
resource,
scope_spans: vec![scope_span],
};
self.resource_spans.push(resource_span);
}
self.resource_spans[0].scope_spans[0].spans.push(span);
}
}
pub fn get_random_span_id() -> String {
let mut rng = rand::thread_rng();
let mut random_bytes = [0u8; 8];
rng.fill_bytes(&mut random_bytes);
hex::encode(random_bytes)
}

View file

@ -1,4 +1,7 @@
use std::{collections::HashMap, time::Duration};
use std::{
collections::HashMap,
time::{Duration, SystemTime, UNIX_EPOCH},
};
use common::{
common_types::{
@ -13,7 +16,9 @@ use common::{
HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, TRACE_PARENT_HEADER, USER_ROLE,
},
errors::ServerError,
http::{CallArgs, Client}, pii::obfuscate_auth_header,
http::{CallArgs, Client},
pii::obfuscate_auth_header,
tracing::{Event, Span},
};
use http::StatusCode;
use log::{debug, trace, warn};
@ -239,7 +244,7 @@ impl HttpContext for StreamContext {
fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
trace!(
"recv [S={}] bytes={} end_stream={}",
"on_http_response_body: recv [S={}] bytes={} end_stream={}",
self.context_id,
body_size,
end_of_stream
@ -250,6 +255,55 @@ impl HttpContext for StreamContext {
return Action::Continue;
}
if self.time_to_first_token.is_none() {
self.time_to_first_token = Some(
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos(),
);
}
if end_of_stream && body_size == 0 {
if let Some(traceparent) = self.traceparent.as_ref() {
let since_the_epoch_ns = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
let traceparent_tokens = traceparent.split("-").collect::<Vec<&str>>();
if traceparent_tokens.len() != 4 {
warn!("traceparent header is invalid: {}", traceparent);
return Action::Continue;
}
let parent_trace_id = traceparent_tokens[1];
let parent_span_id = traceparent_tokens[2];
let mut trace_data = common::tracing::TraceData::new();
let mut llm_span = Span::new(
"upstream_llm_time".to_string(),
parent_trace_id.to_string(),
Some(parent_span_id.to_string()),
self.start_upstream_llm_request_time,
since_the_epoch_ns,
);
if let Some(prompt) = self.user_prompt.as_ref() {
if let Some(content) = prompt.content.as_ref() {
llm_span.add_attribute("user_prompt".to_string(), content.to_string());
}
}
llm_span.add_event(Event::new(
"time_to_first_token".to_string(),
self.time_to_first_token.unwrap(),
));
trace_data.add_span(llm_span);
let trace_data_str = serde_json::to_string(&trace_data).unwrap();
debug!("upstream_llm trace details: {}", trace_data_str);
// send trace_data to http tracing endpoint
}
return Action::Continue;
}
let body = if self.streaming_response {
let streaming_chunk = match self.get_http_response_body(0, body_size) {
Some(chunk) => chunk,

View file

@ -33,7 +33,7 @@ use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::str::FromStr;
use std::time::Duration;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
#[derive(Debug, Clone)]
pub enum ResponseHandlerType {
@ -77,6 +77,8 @@ pub struct StreamContext {
pub chat_completions_request: Option<ChatCompletionsRequest>,
pub prompt_guards: Rc<PromptGuards>,
pub request_id: Option<String>,
pub start_upstream_llm_request_time: u128,
pub time_to_first_token: Option<u128>,
pub traceparent: Option<String>,
pub tracing: Rc<Option<Tracing>>,
}
@ -113,6 +115,8 @@ impl StreamContext {
request_id: None,
traceparent: None,
tracing,
start_upstream_llm_request_time: 0,
time_to_first_token: None,
}
}
@ -1003,6 +1007,11 @@ impl StreamContext {
};
debug!("archgw => llm request: {}", llm_request_str);
self.start_upstream_llm_request_time = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_nanos();
self.set_http_request_body(0, self.request_body_size, &llm_request_str.into_bytes());
self.resume_http_request();
}

View file

@ -0,0 +1,11 @@
FROM python:3.12-slim as arch
WORKDIR /app
RUN pip install requests
COPY stream_traces.py .
RUN mkdir -p /var/log
RUN touch /var/log/envoy.log
CMD ["python", "stream_traces.py"]

View file

@ -0,0 +1,42 @@
import os
import time
import requests
import logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(levelname)s - %(message)s"
)
otel_tracing_endpoint = os.getenv(
"OTEL_TRACING_HTTP_ENDPOINT", "http://localhost:4318/v1/traces"
)
envoy_log_path = os.getenv("ENVOY_LOG_PATH", "/var/log/envoy.log")
logging.info(f"Using otel-tracing host: {otel_tracing_endpoint}")
logging.info(f"Using envoy log path: {envoy_log_path}")
def process_log_line(line):
try:
response = requests.post(
url=otel_tracing_endpoint,
data=line,
headers={"Content-Type": "application/json"},
)
logging.info(f"Sent trace to otel-tracing: {response.status_code}")
except Exception as e:
logging.error(f"Failed to send trace to otel-tracing: {e}")
with open(envoy_log_path, "r") as f:
# Seek to the end of the file so we only read new lines
f.seek(0, os.SEEK_END)
while True:
line = f.readline()
if not line:
time.sleep(1)
continue
tokens = line.split("prompt_gateway: upstream_llm trace details: ")
if len(tokens) > 1:
process_log_line(tokens[1])

View file

@ -28,3 +28,12 @@ services:
ports:
- "16686:16686"
- "4317:4317"
- "4318:4318"
trace_streamer:
build:
context: ../shared/trace_streamer
environment:
- OTEL_TRACING_HTTP_ENDPOINT=http://jaeger:4318/v1/traces
volumes:
- ~/archgw_logs:/var/log/

31
tracing.rest Normal file
View file

@ -0,0 +1,31 @@
POST http://localhost:4318/v1/traces
Content-Type: application/json
{
"resourceSpans": [
{
"resource": {
"attributes": [
{ "key": "service.name", "value": { "stringValue": "upstream-llm" } }
]
},
"scopeSpans": [
{
"scope": { "name": "default", "version": "1.0", "attributes": [] },
"spans": [
{
"traceId": "fa8f7c410c28092faafbd7d4a2f5e742",
"spanId": "4dc43055a07410d6",
"parentSpanId": "f0acd74216a5e179",
"name": "archgw",
"startTimeUnixNano": "1731363782228270000",
"endTimeUnixNano": "1731363787843156000",
"kind": 1,
"attributes": []
}
]
}
]
}
]
}