From 3163a1c14927ea74f78a9114305c9c3dbd3e8da6 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Tue, 5 Nov 2024 11:57:22 -0800 Subject: [PATCH] wip --- arch/Dockerfile | 2 +- arch/envoy.template.yaml | 22 ++++++++-- crates/prompt_gateway/src/http_context.rs | 47 +++++++++++++++++++-- crates/prompt_gateway/src/stream_context.rs | 6 +++ 4 files changed, 70 insertions(+), 7 deletions(-) diff --git a/arch/Dockerfile b/arch/Dockerfile index 6d7a0f9b..72f7de4d 100644 --- a/arch/Dockerfile +++ b/arch/Dockerfile @@ -26,4 +26,4 @@ COPY arch/envoy.template.yaml . COPY arch/arch_config_schema.yaml . -ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug"] +ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:trace"] diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index 40ddd9a8..af4e356e 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -7,7 +7,7 @@ static_resources: address: socket_address: address: 0.0.0.0 - port_value: 10000 + port_value: 9000 traffic_direction: INBOUND filter_chains: - filters: @@ -40,6 +40,10 @@ static_resources: path: "/var/log/access_ingress.log" route_config: name: local_routes + request_headers_to_add: + - header: + key: "x-envoy-force-trace" + value: "true" virtual_hosts: - name: local_service domains: @@ -60,7 +64,7 @@ static_resources: address: socket_address: address: 0.0.0.0 - port_value: 10001 + port_value: 10000 traffic_direction: INBOUND filter_chains: - filters: @@ -93,6 +97,10 @@ static_resources: path: "/var/log/access_ingress_prompt.log" route_config: name: local_routes + request_headers_to_add: + - header: + key: "x-envoy-force-trace" + value: "true" virtual_hosts: - name: local_service domains: @@ -184,6 +192,10 @@ static_resources: path: "/var/log/access_internal.log" route_config: name: local_routes + request_headers_to_add: + - header: + key: "x-envoy-force-trace" + value: "true" virtual_hosts: - name: local_service domains: @@ -256,6 +268,10 @@ static_resources: path: "/var/log/access_llm.log" route_config: name: local_routes + request_headers_to_add: + - header: + key: "x-envoy-force-trace" + value: "true" virtual_hosts: - name: local_service domains: @@ -454,7 +470,7 @@ static_resources: address: socket_address: address: 0.0.0.0 - port_value: 10001 + port_value: 10000 hostname: arch_prompt_gateway_listener - name: arch_llm_listener diff --git a/crates/prompt_gateway/src/http_context.rs b/crates/prompt_gateway/src/http_context.rs index 55b77213..0ea27cfb 100644 --- a/crates/prompt_gateway/src/http_context.rs +++ b/crates/prompt_gateway/src/http_context.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, fmt::Write, time::Duration}; use common::{ common_types::{ @@ -8,16 +8,20 @@ use common::{ PromptGuardRequest, PromptGuardTask, }, consts::{ - ARCH_FC_MODEL_NAME, ARCH_INTERNAL_CLUSTER_NAME, ARCH_STATE_HEADER, ARCH_UPSTREAM_HOST_HEADER, ASSISTANT_ROLE, CHAT_COMPLETIONS_PATH, GUARD_INTERNAL_HOST, HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, TRACE_PARENT_HEADER, USER_ROLE + ARCH_FC_MODEL_NAME, ARCH_INTERNAL_CLUSTER_NAME, ARCH_STATE_HEADER, + ARCH_UPSTREAM_HOST_HEADER, ASSISTANT_ROLE, CHAT_COMPLETIONS_PATH, GUARD_INTERNAL_HOST, + HEALTHZ_PATH, REQUEST_ID_HEADER, TOOL_ROLE, TRACE_PARENT_HEADER, USER_ROLE, }, errors::ServerError, http::{CallArgs, Client}, }; use http::StatusCode; -use log::{debug, trace, warn}; +use log::{debug, info, trace, warn}; use proxy_wasm::{traits::HttpContext, types::Action}; use serde_json::Value; +use rand::{distributions::Alphanumeric, thread_rng, Rng}; + use crate::stream_context::{ResponseHandlerType, StreamCallContext, StreamContext}; // HttpContext is the trait that allows the Rust code to interact with HTTP objects. @@ -51,13 +55,38 @@ impl HttpContext for StreamContext { self.request_id = self.get_http_request_header(REQUEST_ID_HEADER); self.traceparent = self.get_http_request_header(TRACE_PARENT_HEADER); + if self.traceparent.is_none() { + // let trace_id: String = generate_random_hex_string(16); + // self.set_http_request_header("x-client-trace-id", Some(trace_id.as_str())); + } + // let trace_id: String = generate_random_hex_string(16); + // let parent_id: String = generate_random_hex_string(8); + + // // let's add a traceparent header if it's not present + // let trace_version = "00"; + // //TODO: fix 00 if sampled, 01 if not sampled. Hard coded for now. + // let trace_flags = "01"; + + // let trace_id = format!( + // "{}-{}-{}-{}", + // trace_version, trace_id, parent_id, trace_flags + // ); + + // debug!("attaching traceparent header: {}", trace_id); + + // self.traceparent = Some(trace_id.clone()); + // self.set_http_request_header(TRACE_PARENT_HEADER, Some(trace_id.as_str())); + // } + + self.set_http_request_header("x-envoy-force-trace", Some("true")); Action::Continue } fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { // Let the client send the gateway all the data before sending to the LLM_provider. // TODO: consider a streaming API. + if !end_of_stream { return Action::Pause; } @@ -360,3 +389,15 @@ impl HttpContext for StreamContext { Action::Continue } } + +fn generate_random_hex_string(len: usize) -> String { + let mut rng = thread_rng(); + let mut hex_string = String::with_capacity(len); + + for _ in 0..len { + let byte = rng.gen::(); + write!(&mut hex_string, "{:02x}", byte).unwrap(); + } + + hex_string +} diff --git a/crates/prompt_gateway/src/stream_context.rs b/crates/prompt_gateway/src/stream_context.rs index 3d861dab..c068072b 100644 --- a/crates/prompt_gateway/src/stream_context.rs +++ b/crates/prompt_gateway/src/stream_context.rs @@ -155,6 +155,7 @@ impl StreamContext { ("content-type", "application/json"), ("x-envoy-max-retries", "3"), ("x-envoy-upstream-rq-timeout-ms", "60000"), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() { @@ -284,6 +285,7 @@ impl StreamContext { ("content-type", "application/json"), ("x-envoy-max-retries", "3"), ("x-envoy-upstream-rq-timeout-ms", "60000"), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() { @@ -484,6 +486,7 @@ impl StreamContext { ("content-type", "application/json"), ("x-envoy-max-retries", "3"), ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() { @@ -635,6 +638,7 @@ impl StreamContext { ("content-type", "application/json"), ("x-envoy-max-retries", "3"), ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() { @@ -815,6 +819,7 @@ impl StreamContext { ("content-type", "application/json"), ("x-envoy-max-retries", "3"), ("x-envoy-upstream-rq-timeout-ms", "60000"), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() { @@ -870,6 +875,7 @@ impl StreamContext { (":authority", endpoint.name.as_str()), ("content-type", "application/json"), ("x-envoy-max-retries", "3"), + ("x-envoy-force-trace", "true"), ]; if self.request_id.is_some() {