diff --git a/envoyfilter/docker-compose.yaml b/envoyfilter/docker-compose.yaml index 4f237ed0..426ac4d5 100644 --- a/envoyfilter/docker-compose.yaml +++ b/envoyfilter/docker-compose.yaml @@ -8,6 +8,7 @@ services: volumes: - ./envoy.yaml:/etc/envoy/envoy.yaml - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins + - /etc/ssl/cert.pem:/etc/ssl/cert.pem networks: - envoymesh depends_on: diff --git a/envoyfilter/envoy.yaml b/envoyfilter/envoy.yaml index 3b616062..65ce61eb 100644 --- a/envoyfilter/envoy.yaml +++ b/envoyfilter/envoy.yaml @@ -14,13 +14,29 @@ static_resources: "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager stat_prefix: ingress_http codec_type: AUTO + scheme_header_transformation: + scheme_to_overwrite: https route_config: name: local_routes virtual_hosts: + - name: openai + domains: + - "api.openai.com" + routes: + - match: + prefix: "/" + route: + auto_host_rewrite: true + cluster: openai - name: local_service domains: - "*" routes: + - match: + prefix: "/v1/chat/completions" + route: + auto_host_rewrite: true + cluster: openai - match: prefix: "/embeddings" route: @@ -88,6 +104,38 @@ static_resources: typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router clusters: + # LLM Host + # Embedding Providers + # External LLM Providers + - name: openai + connect_timeout: 5s + type: LOGICAL_DNS + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + load_assignment: + cluster_name: openai + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: api.openai.com + port_value: 443 + hostname: "api.openai.com" + transport_socket: + name: envoy.transport_sockets.tls + typed_config: + "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext + sni: api.openai.com + common_tls_context: + tls_params: + tls_minimum_protocol_version: TLSv1_2 + tls_maximum_protocol_version: TLSv1_3 + - name: httpbin connect_timeout: 5s type: STRICT_DNS diff --git a/envoyfilter/src/lib.rs b/envoyfilter/src/lib.rs index 4eb33c4f..9054716a 100644 --- a/envoyfilter/src/lib.rs +++ b/envoyfilter/src/lib.rs @@ -1,9 +1,6 @@ -mod common_types; -mod configuration; -mod consts; - use common_types::EmbeddingRequest; use log::info; +use log::warn; use serde_json::to_string; use stats::IncrementingMetric; use stats::Metric; @@ -14,12 +11,16 @@ use std::time::Duration; use proxy_wasm::traits::*; use proxy_wasm::types::*; +mod common_types; +mod configuration; +mod consts; +mod llm_backend; mod stats; proxy_wasm::main! {{ proxy_wasm::set_log_level(LogLevel::Trace); proxy_wasm::set_root_context(|_| -> Box { - Box::new(HttpHeaderRoot { + Box::new(FilterContext { callouts: HashMap::new(), config: None, metrics: WasmMetrics { @@ -31,14 +32,15 @@ proxy_wasm::main! {{ }); }} -struct HttpHeader { +struct StreamContext { context_id: u32, config: configuration::Configuration, metrics: WasmMetrics, + host_header: Option, } // HttpContext is the trait that allows the Rust code to interact with HTTP objects. -impl HttpContext for HttpHeader { +impl HttpContext for StreamContext { // Envoy's HTTP model is event driven. The WASM ABI has given implementors events to hook onto // the lifecycle of the http request and response. fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { @@ -50,13 +52,21 @@ impl HttpContext for HttpHeader { self.metrics.gauge.record(20); info!("gauge -> {}", self.metrics.gauge.value()); self.metrics.histogram.record(30); - info!("histogram -> {}", self.metrics.histogram.value()); + // info!("histogram -> {}", self.metrics.histogram.value()); // Example of reading the HTTP headers on the incoming request for (name, value) in &self.get_http_request_headers() { info!("#{} -> {}: {}", self.context_id, name, value); } + // Save the host header to be used by filter logic later on. + self.host_header = self.get_http_request_header(":host"); + // Remove the Content-Length header because further body manipulations in the gateway logic will invalidate it. + // Server's generally throw away requests whose body length do not match the Content-Length header. + // However, a missing Content-Length header is not grounds for bad requests given that intermediary hops could + // manipulate the body in benign ways e.g., compression. + self.set_http_request_header("content-length", None); + // Example logic of branching based on a request header. match self.get_http_request_header(":path") { // If the path header is present and the path is /inline @@ -77,19 +87,58 @@ impl HttpContext for HttpHeader { // Pause the filter until the out of band HTTP response arrives. Action::Pause } - - // Otherwise let the HTTP request continue. + // The gateway can start gathering information necessary for routing. For now change the path to an + // OpenAI API path. + Some(path) if path == "/llmrouting" => { + self.set_http_request_header(":path", Some("/v1/chat/completions")); + Action::Continue + } + // Otherwise let the filter continue. _ => Action::Continue, } } + fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { + // Let the filter continue if the request is not meant for OpenAi + match &self.host_header { + Some(host) if host != "api.openai.com" => return Action::Continue, + _ => {} + } + + // Let the client send the gateway all the data before sending to the LLM_provider + if !end_of_stream { + return Action::Pause; + } + + if let Some(body_bytes) = self.get_http_request_body(0, body_size) { + let mut deserialized: llm_backend::ChatCompletions = + match serde_json::from_slice(&body_bytes) { + Ok(deserialized) => deserialized, + Err(msg) => panic!("Failed to deserialize: {}", msg), + }; + + warn!("deserialized body = {:?}", deserialized); + + // This is the big moment here: the user did not set the model in their request. + // The gateway is setting the model for them. + deserialized.model = String::from("gpt-3.5-turbo"); + let json_string = serde_json::to_string(&deserialized).unwrap(); + + warn!("serialized json = {}", json_string); + + self.set_http_request_body(0, body_size, &json_string.into_bytes()) + } + + Action::Continue + } + fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action { self.set_http_response_header("Powered-By", Some("Katanemo")); Action::Continue } } -impl Context for HttpHeader { +impl Context for StreamContext { // Note that the event driven model continues here from the return of the on_http_request_headers above. fn on_http_call_response(&mut self, _: u32, _: usize, body_size: usize, _: usize) { info!("on_http_call_response: body_size = {}", body_size); @@ -122,14 +171,14 @@ struct WasmMetrics { histogram: stats::Histogram, } -struct HttpHeaderRoot { +struct FilterContext { metrics: WasmMetrics, // callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request. callouts: HashMap, config: Option, } -impl Context for HttpHeaderRoot { +impl Context for FilterContext { fn on_http_call_response( &mut self, token_id: u32, @@ -224,7 +273,7 @@ impl Context for HttpHeaderRoot { } // RootContext allows the Rust code to reach into the Envoy Config -impl RootContext for HttpHeaderRoot { +impl RootContext for FilterContext { fn on_configure(&mut self, _: usize) -> bool { if let Some(config_bytes) = self.get_plugin_configuration() { self.config = serde_yaml::from_slice(&config_bytes).unwrap(); @@ -234,10 +283,11 @@ impl RootContext for HttpHeaderRoot { } fn create_http_context(&self, context_id: u32) -> Option> { - Some(Box::new(HttpHeader { + Some(Box::new(StreamContext { context_id, config: self.config.clone()?, metrics: self.metrics, + host_header: None, })) } diff --git a/envoyfilter/src/llm_backend.rs b/envoyfilter/src/llm_backend.rs new file mode 100644 index 00000000..cfa5c867 --- /dev/null +++ b/envoyfilter/src/llm_backend.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ChatCompletions { + #[serde(default)] + pub model: String, + pub messages: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct Message { + pub role: String, + pub content: String, +}