Add initial logic to send prompts to LLM API (#9)

Signed-off-by: José Ulises Niño Rivera <junr03@users.noreply.github.com>
This commit is contained in:
José Ulises Niño Rivera 2024-07-19 13:14:48 -07:00 committed by GitHub
parent 31c4ac267a
commit 5b4143d580
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 128 additions and 15 deletions

View file

@ -8,6 +8,7 @@ services:
volumes: volumes:
- ./envoy.yaml:/etc/envoy/envoy.yaml - ./envoy.yaml:/etc/envoy/envoy.yaml
- ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins - ./target/wasm32-wasi/release:/etc/envoy/proxy-wasm-plugins
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
networks: networks:
- envoymesh - envoymesh
depends_on: depends_on:

View file

@ -14,13 +14,29 @@ static_resources:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http stat_prefix: ingress_http
codec_type: AUTO codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
route_config: route_config:
name: local_routes name: local_routes
virtual_hosts: virtual_hosts:
- name: openai
domains:
- "api.openai.com"
routes:
- match:
prefix: "/"
route:
auto_host_rewrite: true
cluster: openai
- name: local_service - name: local_service
domains: domains:
- "*" - "*"
routes: routes:
- match:
prefix: "/v1/chat/completions"
route:
auto_host_rewrite: true
cluster: openai
- match: - match:
prefix: "/embeddings" prefix: "/embeddings"
route: route:
@ -88,6 +104,38 @@ static_resources:
typed_config: typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters: clusters:
# LLM Host
# Embedding Providers
# External LLM Providers
- name: openai
connect_timeout: 5s
type: LOGICAL_DNS
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: openai
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: api.openai.com
port_value: 443
hostname: "api.openai.com"
transport_socket:
name: envoy.transport_sockets.tls
typed_config:
"@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext
sni: api.openai.com
common_tls_context:
tls_params:
tls_minimum_protocol_version: TLSv1_2
tls_maximum_protocol_version: TLSv1_3
- name: httpbin - name: httpbin
connect_timeout: 5s connect_timeout: 5s
type: STRICT_DNS type: STRICT_DNS

View file

@ -1,9 +1,6 @@
mod common_types;
mod configuration;
mod consts;
use common_types::EmbeddingRequest; use common_types::EmbeddingRequest;
use log::info; use log::info;
use log::warn;
use serde_json::to_string; use serde_json::to_string;
use stats::IncrementingMetric; use stats::IncrementingMetric;
use stats::Metric; use stats::Metric;
@ -14,12 +11,16 @@ use std::time::Duration;
use proxy_wasm::traits::*; use proxy_wasm::traits::*;
use proxy_wasm::types::*; use proxy_wasm::types::*;
mod common_types;
mod configuration;
mod consts;
mod llm_backend;
mod stats; mod stats;
proxy_wasm::main! {{ proxy_wasm::main! {{
proxy_wasm::set_log_level(LogLevel::Trace); proxy_wasm::set_log_level(LogLevel::Trace);
proxy_wasm::set_root_context(|_| -> Box<dyn RootContext> { proxy_wasm::set_root_context(|_| -> Box<dyn RootContext> {
Box::new(HttpHeaderRoot { Box::new(FilterContext {
callouts: HashMap::new(), callouts: HashMap::new(),
config: None, config: None,
metrics: WasmMetrics { metrics: WasmMetrics {
@ -31,14 +32,15 @@ proxy_wasm::main! {{
}); });
}} }}
struct HttpHeader { struct StreamContext {
context_id: u32, context_id: u32,
config: configuration::Configuration, config: configuration::Configuration,
metrics: WasmMetrics, metrics: WasmMetrics,
host_header: Option<String>,
} }
// HttpContext is the trait that allows the Rust code to interact with HTTP objects. // HttpContext is the trait that allows the Rust code to interact with HTTP objects.
impl HttpContext for HttpHeader { impl HttpContext for StreamContext {
// Envoy's HTTP model is event driven. The WASM ABI has given implementors events to hook onto // Envoy's HTTP model is event driven. The WASM ABI has given implementors events to hook onto
// the lifecycle of the http request and response. // the lifecycle of the http request and response.
fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action { fn on_http_request_headers(&mut self, _num_headers: usize, _end_of_stream: bool) -> Action {
@ -50,13 +52,21 @@ impl HttpContext for HttpHeader {
self.metrics.gauge.record(20); self.metrics.gauge.record(20);
info!("gauge -> {}", self.metrics.gauge.value()); info!("gauge -> {}", self.metrics.gauge.value());
self.metrics.histogram.record(30); self.metrics.histogram.record(30);
info!("histogram -> {}", self.metrics.histogram.value()); // info!("histogram -> {}", self.metrics.histogram.value());
// Example of reading the HTTP headers on the incoming request // Example of reading the HTTP headers on the incoming request
for (name, value) in &self.get_http_request_headers() { for (name, value) in &self.get_http_request_headers() {
info!("#{} -> {}: {}", self.context_id, name, value); info!("#{} -> {}: {}", self.context_id, name, value);
} }
// Save the host header to be used by filter logic later on.
self.host_header = self.get_http_request_header(":host");
// Remove the Content-Length header because further body manipulations in the gateway logic will invalidate it.
// Server's generally throw away requests whose body length do not match the Content-Length header.
// However, a missing Content-Length header is not grounds for bad requests given that intermediary hops could
// manipulate the body in benign ways e.g., compression.
self.set_http_request_header("content-length", None);
// Example logic of branching based on a request header. // Example logic of branching based on a request header.
match self.get_http_request_header(":path") { match self.get_http_request_header(":path") {
// If the path header is present and the path is /inline // If the path header is present and the path is /inline
@ -77,19 +87,58 @@ impl HttpContext for HttpHeader {
// Pause the filter until the out of band HTTP response arrives. // Pause the filter until the out of band HTTP response arrives.
Action::Pause Action::Pause
} }
// The gateway can start gathering information necessary for routing. For now change the path to an
// Otherwise let the HTTP request continue. // OpenAI API path.
Some(path) if path == "/llmrouting" => {
self.set_http_request_header(":path", Some("/v1/chat/completions"));
Action::Continue
}
// Otherwise let the filter continue.
_ => Action::Continue, _ => Action::Continue,
} }
} }
fn on_http_request_body(&mut self, body_size: usize, end_of_stream: bool) -> Action {
// Let the filter continue if the request is not meant for OpenAi
match &self.host_header {
Some(host) if host != "api.openai.com" => return Action::Continue,
_ => {}
}
// Let the client send the gateway all the data before sending to the LLM_provider
if !end_of_stream {
return Action::Pause;
}
if let Some(body_bytes) = self.get_http_request_body(0, body_size) {
let mut deserialized: llm_backend::ChatCompletions =
match serde_json::from_slice(&body_bytes) {
Ok(deserialized) => deserialized,
Err(msg) => panic!("Failed to deserialize: {}", msg),
};
warn!("deserialized body = {:?}", deserialized);
// This is the big moment here: the user did not set the model in their request.
// The gateway is setting the model for them.
deserialized.model = String::from("gpt-3.5-turbo");
let json_string = serde_json::to_string(&deserialized).unwrap();
warn!("serialized json = {}", json_string);
self.set_http_request_body(0, body_size, &json_string.into_bytes())
}
Action::Continue
}
fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action { fn on_http_response_headers(&mut self, _: usize, _: bool) -> Action {
self.set_http_response_header("Powered-By", Some("Katanemo")); self.set_http_response_header("Powered-By", Some("Katanemo"));
Action::Continue Action::Continue
} }
} }
impl Context for HttpHeader { impl Context for StreamContext {
// Note that the event driven model continues here from the return of the on_http_request_headers above. // Note that the event driven model continues here from the return of the on_http_request_headers above.
fn on_http_call_response(&mut self, _: u32, _: usize, body_size: usize, _: usize) { fn on_http_call_response(&mut self, _: u32, _: usize, body_size: usize, _: usize) {
info!("on_http_call_response: body_size = {}", body_size); info!("on_http_call_response: body_size = {}", body_size);
@ -122,14 +171,14 @@ struct WasmMetrics {
histogram: stats::Histogram, histogram: stats::Histogram,
} }
struct HttpHeaderRoot { struct FilterContext {
metrics: WasmMetrics, metrics: WasmMetrics,
// callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request. // callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request.
callouts: HashMap<u32, common_types::CalloutData>, callouts: HashMap<u32, common_types::CalloutData>,
config: Option<configuration::Configuration>, config: Option<configuration::Configuration>,
} }
impl Context for HttpHeaderRoot { impl Context for FilterContext {
fn on_http_call_response( fn on_http_call_response(
&mut self, &mut self,
token_id: u32, token_id: u32,
@ -224,7 +273,7 @@ impl Context for HttpHeaderRoot {
} }
// RootContext allows the Rust code to reach into the Envoy Config // RootContext allows the Rust code to reach into the Envoy Config
impl RootContext for HttpHeaderRoot { impl RootContext for FilterContext {
fn on_configure(&mut self, _: usize) -> bool { fn on_configure(&mut self, _: usize) -> bool {
if let Some(config_bytes) = self.get_plugin_configuration() { if let Some(config_bytes) = self.get_plugin_configuration() {
self.config = serde_yaml::from_slice(&config_bytes).unwrap(); self.config = serde_yaml::from_slice(&config_bytes).unwrap();
@ -234,10 +283,11 @@ impl RootContext for HttpHeaderRoot {
} }
fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> { fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
Some(Box::new(HttpHeader { Some(Box::new(StreamContext {
context_id, context_id,
config: self.config.clone()?, config: self.config.clone()?,
metrics: self.metrics, metrics: self.metrics,
host_header: None,
})) }))
} }

View file

@ -0,0 +1,14 @@
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ChatCompletions {
#[serde(default)]
pub model: String,
pub messages: Vec<Message>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
pub role: String,
pub content: String,
}