split wasm filter

This commit is contained in:
Adil Hafeez 2024-10-15 17:29:00 -07:00
parent b1746b38b4
commit 0e04b09f56
44 changed files with 6009 additions and 272 deletions

2159
arch/Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,29 +0,0 @@
[package]
name = "intelligent-prompt-gateway"
version = "0.1.0"
authors = ["Katanemo Inc <info@katanemo.com>"]
edition = "2021"
[lib]
crate-type = ["cdylib"]
[dependencies]
proxy-wasm = "0.2.1"
log = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_yaml = "0.9.34"
serde_json = "1.0"
md5 = "0.7.0"
public_types = { path = "../public_types" }
http = "1.1.0"
governor = { version = "0.6.3", default-features = false, features = ["no_std"]}
tiktoken-rs = "0.5.9"
acap = "0.3.0"
rand = "0.8.5"
thiserror = "1.0.64"
derivative = "2.2.0"
sha2 = "0.10.8"
[dev-dependencies]
proxy-wasm-test-framework = { git = "https://github.com/katanemo/test-framework.git", branch = "new" }
serial_test = "3.1.1"

View file

@ -2,19 +2,18 @@
FROM rust:1.80.0 as builder
RUN rustup -v target add wasm32-wasi
WORKDIR /arch
COPY arch/src /arch/src
COPY arch/Cargo.toml /arch/
COPY arch/Cargo.lock /arch/
COPY public_types /public_types
COPY crates .
RUN cargo build --release --target wasm32-wasi
RUN cd prompt_gateway && cargo build --release --target wasm32-wasi
RUN cd llm_gateway && cargo build --release --target wasm32-wasi
# copy built filter into envoy image
FROM envoyproxy/envoy:v1.31-latest as envoy
#Build config generator, so that we have a single build image for both Rust and Python
FROM python:3-slim as arch
COPY --from=builder /arch/target/wasm32-wasi/release/intelligent_prompt_gateway.wasm /etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm
COPY --from=builder /arch/prompt_gateway/target/wasm32-wasi/release/prompt_gateway.wasm /etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm
COPY --from=builder /arch/llm_gateway/target/wasm32-wasi/release/llm_gateway.wasm /etc/envoy/proxy-wasm-plugins/llm_gateway.wasm
COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy
WORKDIR /config
COPY arch/requirements.txt .

View file

@ -90,7 +90,7 @@ static_resources:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm"
filename: "/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm"
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
@ -250,7 +250,7 @@ static_resources:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm"
filename: "/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm"
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router

View file

@ -1,9 +0,0 @@
apiVersion: 1
datasources:
- name: Prometheus
type: prometheus
url: http://prometheus:9090
isDefault: true
access: proxy
editable: true

View file

@ -1,23 +0,0 @@
global:
scrape_interval: 15s
scrape_timeout: 10s
evaluation_interval: 15s
alerting:
alertmanagers:
- static_configs:
- targets: []
scheme: http
timeout: 10s
api_version: v1
scrape_configs:
- job_name: envoy
honor_timestamps: true
scrape_interval: 15s
scrape_timeout: 10s
metrics_path: /stats
scheme: http
static_configs:
- targets:
- envoy:9901
params:
format: ['prometheus']

View file

@ -1,22 +0,0 @@
pub const DEFAULT_EMBEDDING_MODEL: &str = "katanemo/bge-large-en-v1.5";
pub const DEFAULT_INTENT_MODEL: &str = "katanemo/bart-large-mnli";
pub const DEFAULT_PROMPT_TARGET_THRESHOLD: f64 = 0.8;
pub const DEFAULT_HALLUCINATED_THRESHOLD: f64 = 0.25;
pub const RATELIMIT_SELECTOR_HEADER_KEY: &str = "x-arch-ratelimit-selector";
pub const SYSTEM_ROLE: &str = "system";
pub const USER_ROLE: &str = "user";
pub const GPT_35_TURBO: &str = "gpt-3.5-turbo";
pub const ARC_FC_CLUSTER: &str = "arch_fc";
pub const ARCH_FC_REQUEST_TIMEOUT_MS: u64 = 120000; // 2 minutes
pub const MODEL_SERVER_NAME: &str = "model_server";
pub const ARCH_ROUTING_HEADER: &str = "x-arch-llm-provider";
pub const ARCH_MESSAGES_KEY: &str = "arch_messages";
pub const ARCH_PROVIDER_HINT_HEADER: &str = "x-arch-llm-provider-hint";
pub const CHAT_COMPLETIONS_PATH: &str = "v1/chat/completions";
pub const ARCH_STATE_HEADER: &str = "x-arch-state";
pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function-1.5B";
pub const REQUEST_ID_HEADER: &str = "x-request-id";
pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal";
pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream";
pub const ARCH_LLM_UPSTREAM_LISTENER: &str = "arch_llm_listener";
pub const ARCH_MODEL_PREFIX: &str = "Arch";

View file

@ -1,321 +0,0 @@
use crate::consts::{
ARCH_INTERNAL_CLUSTER_NAME, ARCH_UPSTREAM_HOST_HEADER, DEFAULT_EMBEDDING_MODEL,
MODEL_SERVER_NAME,
};
use crate::http::{CallArgs, Client};
use crate::llm_providers::LlmProviders;
use crate::ratelimit;
use crate::stats::{Counter, Gauge, IncrementingMetric};
use crate::stream_context::StreamContext;
use log::debug;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
use public_types::common_types::EmbeddingType;
use public_types::configuration::{
Configuration, GatewayMode, Overrides, PromptGuards, PromptTarget,
};
use public_types::embeddings::{
CreateEmbeddingRequest, CreateEmbeddingRequestInput, CreateEmbeddingResponse,
};
use std::cell::RefCell;
use std::collections::hash_map::Entry;
use std::collections::HashMap;
use std::rc::Rc;
use std::time::Duration;
#[derive(Copy, Clone, Debug)]
pub struct WasmMetrics {
pub active_http_calls: Gauge,
pub ratelimited_rq: Counter,
}
impl WasmMetrics {
fn new() -> WasmMetrics {
WasmMetrics {
active_http_calls: Gauge::new(String::from("active_http_calls")),
ratelimited_rq: Counter::new(String::from("ratelimited_rq")),
}
}
}
pub type EmbeddingTypeMap = HashMap<EmbeddingType, Vec<f64>>;
pub type EmbeddingsStore = HashMap<String, EmbeddingTypeMap>;
#[derive(Debug)]
pub struct FilterCallContext {
pub prompt_target_name: String,
pub embedding_type: EmbeddingType,
}
#[derive(Debug)]
pub struct FilterContext {
metrics: Rc<WasmMetrics>,
// callouts stores token_id to request mapping that we use during #on_http_call_response to match the response to the request.
callouts: RefCell<HashMap<u32, FilterCallContext>>,
overrides: Rc<Option<Overrides>>,
system_prompt: Rc<Option<String>>,
prompt_targets: Rc<HashMap<String, PromptTarget>>,
mode: GatewayMode,
prompt_guards: Rc<PromptGuards>,
llm_providers: Option<Rc<LlmProviders>>,
embeddings_store: Option<Rc<EmbeddingsStore>>,
temp_embeddings_store: EmbeddingsStore,
}
impl FilterContext {
pub fn new() -> FilterContext {
FilterContext {
callouts: RefCell::new(HashMap::new()),
metrics: Rc::new(WasmMetrics::new()),
system_prompt: Rc::new(None),
prompt_targets: Rc::new(HashMap::new()),
overrides: Rc::new(None),
prompt_guards: Rc::new(PromptGuards::default()),
mode: GatewayMode::Prompt,
llm_providers: None,
embeddings_store: Some(Rc::new(HashMap::new())),
temp_embeddings_store: HashMap::new(),
}
}
fn process_prompt_targets(&self) {
for values in self.prompt_targets.iter() {
let prompt_target = values.1;
self.schedule_embeddings_call(
&prompt_target.name,
&prompt_target.description,
EmbeddingType::Description,
);
}
}
fn schedule_embeddings_call(
&self,
prompt_target_name: &str,
input: &str,
embedding_type: EmbeddingType,
) {
let embeddings_input = CreateEmbeddingRequest {
input: Box::new(CreateEmbeddingRequestInput::String(String::from(input))),
model: String::from(DEFAULT_EMBEDDING_MODEL),
encoding_format: None,
dimensions: None,
user: None,
};
let json_data = serde_json::to_string(&embeddings_input).unwrap();
let call_args = CallArgs::new(
ARCH_INTERNAL_CLUSTER_NAME,
"/embeddings",
vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
],
Some(json_data.as_bytes()),
vec![],
Duration::from_secs(60),
);
let call_context = crate::filter_context::FilterCallContext {
prompt_target_name: String::from(prompt_target_name),
embedding_type,
};
if let Err(error) = self.http_call(call_args, call_context) {
panic!("{error}")
}
}
fn embedding_response_handler(
&mut self,
body_size: usize,
embedding_type: EmbeddingType,
prompt_target_name: String,
) {
let prompt_target = self
.prompt_targets
.get(&prompt_target_name)
.unwrap_or_else(|| {
panic!(
"Received embeddings response for unknown prompt target name={}",
prompt_target_name
)
});
let body = self
.get_http_call_response_body(0, body_size)
.expect("No body in response");
if !body.is_empty() {
let mut embedding_response: CreateEmbeddingResponse =
match serde_json::from_slice(&body) {
Ok(response) => response,
Err(e) => {
panic!(
"Error deserializing embedding response. body: {:?}: {:?}",
String::from_utf8(body).unwrap(),
e
);
}
};
let embeddings = embedding_response.data.remove(0).embedding;
debug!(
"Adding embeddings for prompt target name: {:?}, description: {:?}, embedding type: {:?}",
prompt_target.name,
prompt_target.description,
embedding_type
);
let entry = self.temp_embeddings_store.entry(prompt_target_name);
match entry {
Entry::Occupied(_) => {
entry.and_modify(|e| {
if let Entry::Vacant(e) = e.entry(embedding_type) {
e.insert(embeddings);
} else {
panic!(
"Duplicate {:?} for prompt target with name=\"{}\"",
&embedding_type, prompt_target.name
)
}
});
}
Entry::Vacant(_) => {
entry.or_insert(HashMap::from([(embedding_type, embeddings)]));
}
}
if self.prompt_targets.len() == self.temp_embeddings_store.len() {
self.embeddings_store =
Some(Rc::new(std::mem::take(&mut self.temp_embeddings_store)))
}
}
}
}
impl Client for FilterContext {
type CallContext = FilterCallContext;
fn callouts(&self) -> &RefCell<HashMap<u32, Self::CallContext>> {
&self.callouts
}
fn active_http_calls(&self) -> &Gauge {
&self.metrics.active_http_calls
}
}
impl Context for FilterContext {
fn on_http_call_response(
&mut self,
token_id: u32,
_num_headers: usize,
body_size: usize,
_num_trailers: usize,
) {
debug!(
"filter_context: on_http_call_response called with token_id: {:?}",
token_id
);
let callout_data = self
.callouts
.borrow_mut()
.remove(&token_id)
.expect("invalid token_id");
self.metrics.active_http_calls.increment(-1);
self.embedding_response_handler(
body_size,
callout_data.embedding_type,
callout_data.prompt_target_name,
)
}
}
// RootContext allows the Rust code to reach into the Envoy Config
impl RootContext for FilterContext {
fn on_configure(&mut self, _: usize) -> bool {
let config_bytes = self
.get_plugin_configuration()
.expect("Arch config cannot be empty");
let config: Configuration = match serde_yaml::from_slice(&config_bytes) {
Ok(config) => config,
Err(err) => panic!("Invalid arch config \"{:?}\"", err),
};
self.overrides = Rc::new(config.overrides);
let mut prompt_targets = HashMap::new();
for pt in config.prompt_targets {
prompt_targets.insert(pt.name.clone(), pt.clone());
}
self.system_prompt = Rc::new(config.system_prompt);
self.prompt_targets = Rc::new(prompt_targets);
self.mode = config.mode.unwrap_or_default();
ratelimit::ratelimits(Some(config.ratelimits.unwrap_or_default()));
if let Some(prompt_guards) = config.prompt_guards {
self.prompt_guards = Rc::new(prompt_guards)
}
match config.llm_providers.try_into() {
Ok(llm_providers) => self.llm_providers = Some(Rc::new(llm_providers)),
Err(err) => panic!("{err}"),
}
true
}
fn create_http_context(&self, context_id: u32) -> Option<Box<dyn HttpContext>> {
debug!(
"||| create_http_context called with context_id: {:?} |||",
context_id
);
// No StreamContext can be created until the Embedding Store is fully initialized.
let embedding_store = match self.mode {
GatewayMode::Llm => None,
GatewayMode::Prompt => Some(Rc::clone(self.embeddings_store.as_ref().unwrap())),
};
Some(Box::new(StreamContext::new(
context_id,
Rc::clone(&self.metrics),
Rc::clone(&self.system_prompt),
Rc::clone(&self.prompt_targets),
Rc::clone(&self.prompt_guards),
Rc::clone(&self.overrides),
Rc::clone(
self.llm_providers
.as_ref()
.expect("LLM Providers must exist when Streams are being created"),
),
embedding_store,
self.mode.clone(),
)))
}
fn get_type(&self) -> Option<ContextType> {
Some(ContextType::HttpContext)
}
fn on_vm_start(&mut self, _: usize) -> bool {
self.set_tick_period(Duration::from_secs(1));
true
}
fn on_tick(&mut self) {
debug!("starting up arch filter in mode: {:?}", self.mode);
if self.mode == GatewayMode::Prompt {
self.process_prompt_targets();
}
self.set_tick_period(Duration::from_secs(0));
}
}

View file

@ -1,93 +0,0 @@
use crate::stats::{Gauge, IncrementingMetric};
use derivative::Derivative;
use log::debug;
use proxy_wasm::{traits::Context, types::Status};
use serde::Serialize;
use std::{cell::RefCell, collections::HashMap, fmt::Debug, time::Duration};
#[derive(Derivative, Serialize)]
#[derivative(Debug)]
pub struct CallArgs<'a> {
upstream: &'a str,
path: &'a str,
headers: Vec<(&'a str, &'a str)>,
#[derivative(Debug = "ignore")]
body: Option<&'a [u8]>,
trailers: Vec<(&'a str, &'a str)>,
timeout: Duration,
}
impl<'a> CallArgs<'a> {
pub fn new(
upstream: &'a str,
path: &'a str,
headers: Vec<(&'a str, &'a str)>,
body: Option<&'a [u8]>,
trailers: Vec<(&'a str, &'a str)>,
timeout: Duration,
) -> Self {
CallArgs {
upstream,
path,
headers,
body,
trailers,
timeout,
}
}
}
#[derive(thiserror::Error, Debug)]
pub enum ClientError {
#[error("Error dispatching HTTP call to `{upstream_name}/{path}`, error: {internal_status:?}")]
DispatchError {
upstream_name: String,
path: String,
internal_status: Status,
},
}
pub trait Client: Context {
type CallContext: Debug;
fn http_call(
&self,
call_args: CallArgs,
call_context: Self::CallContext,
) -> Result<u32, ClientError> {
debug!(
"dispatching http call with args={:?} context={:?}",
call_args, call_context
);
match self.dispatch_http_call(
call_args.upstream,
call_args.headers,
call_args.body,
call_args.trailers,
call_args.timeout,
) {
Ok(id) => {
self.add_call_context(id, call_context);
Ok(id)
}
Err(status) => Err(ClientError::DispatchError {
upstream_name: String::from(call_args.upstream),
path: String::from(call_args.path),
internal_status: status,
}),
}
}
fn add_call_context(&self, id: u32, call_context: Self::CallContext) {
let callouts = self.callouts();
if callouts.borrow_mut().insert(id, call_context).is_some() {
panic!("Duplicate http call with id={}", id);
}
self.active_http_calls().increment(1);
}
fn callouts(&self) -> &RefCell<HashMap<u32, Self::CallContext>>;
fn active_http_calls(&self) -> &Gauge;
}

View file

@ -1,20 +0,0 @@
use filter_context::FilterContext;
use proxy_wasm::traits::*;
use proxy_wasm::types::*;
mod consts;
mod filter_context;
mod http;
mod llm_providers;
mod ratelimit;
mod routing;
mod stats;
mod stream_context;
mod tokenizer;
proxy_wasm::main! {{
proxy_wasm::set_log_level(LogLevel::Trace);
proxy_wasm::set_root_context(|_| -> Box<dyn RootContext> {
Box::new(FilterContext::new())
});
}}

View file

@ -1,69 +0,0 @@
use public_types::configuration::LlmProvider;
use std::collections::HashMap;
use std::rc::Rc;
#[derive(Debug)]
pub struct LlmProviders {
providers: HashMap<String, Rc<LlmProvider>>,
default: Option<Rc<LlmProvider>>,
}
impl LlmProviders {
pub fn iter(&self) -> std::collections::hash_map::Iter<'_, String, Rc<LlmProvider>> {
self.providers.iter()
}
pub fn default(&self) -> Option<Rc<LlmProvider>> {
self.default.as_ref().map(|rc| rc.clone())
}
pub fn get(&self, name: &str) -> Option<Rc<LlmProvider>> {
self.providers.get(name).cloned()
}
}
#[derive(thiserror::Error, Debug)]
pub enum LlmProvidersNewError {
#[error("There must be at least one LLM Provider")]
EmptySource,
#[error("There must be at most one default LLM Provider")]
MoreThanOneDefault,
#[error("\'{0}\' is not a unique name")]
DuplicateName(String),
}
impl TryFrom<Vec<LlmProvider>> for LlmProviders {
type Error = LlmProvidersNewError;
fn try_from(llm_providers_config: Vec<LlmProvider>) -> Result<Self, Self::Error> {
if llm_providers_config.is_empty() {
return Err(LlmProvidersNewError::EmptySource);
}
let mut llm_providers = LlmProviders {
providers: HashMap::new(),
default: None,
};
for llm_provider in llm_providers_config {
let llm_provider: Rc<LlmProvider> = Rc::new(llm_provider);
if llm_provider.default.unwrap_or_default() {
match llm_providers.default {
Some(_) => return Err(LlmProvidersNewError::MoreThanOneDefault),
None => llm_providers.default = Some(Rc::clone(&llm_provider)),
}
}
// Insert and check that there is no other provider with the same name.
let name = llm_provider.name.clone();
if llm_providers
.providers
.insert(name.clone(), llm_provider)
.is_some()
{
return Err(LlmProvidersNewError::DuplicateName(name));
}
}
Ok(llm_providers)
}
}

View file

@ -1,450 +0,0 @@
use governor::{DefaultKeyedRateLimiter, InsufficientCapacity, Quota};
use log::debug;
use public_types::configuration;
use public_types::configuration::{Limit, Ratelimit, TimeUnit};
use std::fmt::Display;
use std::num::{NonZero, NonZeroU32};
use std::sync::RwLock;
use std::{collections::HashMap, sync::OnceLock};
pub type RatelimitData = RwLock<RatelimitMap>;
pub fn ratelimits(ratelimits_config: Option<Vec<Ratelimit>>) -> &'static RatelimitData {
static RATELIMIT_DATA: OnceLock<RatelimitData> = OnceLock::new();
RATELIMIT_DATA.get_or_init(|| {
RwLock::new(RatelimitMap::new(
ratelimits_config.expect("The initialization call has to have passed a config"),
))
})
}
// The Data Structure is laid out in the following way:
// Provider -> Hash { Header -> Limit }.
// If the Header used to configure the given Limit:
// a) Has None value, then there will be N Limit keyed by the Header value.
// b) Has Some() value, then there will be 1 Limit keyed by the empty string.
// It would have been nicer to use a non-keyed limit for b). However, the type system made that option a nightmare.
pub struct RatelimitMap {
datastore: HashMap<String, HashMap<configuration::Header, DefaultKeyedRateLimiter<String>>>,
}
// This version of Header demands that the user passes a header value to match on.
#[derive(Debug, Clone)]
pub struct Header {
pub key: String,
pub value: String,
}
impl Display for Header {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
impl From<Header> for configuration::Header {
fn from(header: Header) -> Self {
Self {
key: header.key,
value: Some(header.value),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("exceeded limit provider={provider}, selector={selector}, tokens_used={tokens_used}")]
ExceededLimit {
provider: String,
selector: Header,
tokens_used: NonZeroU32,
},
}
impl RatelimitMap {
// n.b new is private so that the only access to the Ratelimits can be done via the static
// reference inside a RwLock via ratelimit::ratelimits().
fn new(ratelimits_config: Vec<Ratelimit>) -> Self {
let mut new_ratelimit_map = RatelimitMap {
datastore: HashMap::new(),
};
for ratelimit_config in ratelimits_config {
let limit = DefaultKeyedRateLimiter::keyed(get_quota(ratelimit_config.limit));
match new_ratelimit_map.datastore.get_mut(&ratelimit_config.model) {
Some(limits) => match limits.get_mut(&ratelimit_config.selector) {
Some(_) => {
panic!("repeated selector. Selectors per provider must be unique")
}
None => {
limits.insert(ratelimit_config.selector, limit);
}
},
None => {
// The provider has not been seen before.
// Insert the provider and a new HashMap with the specified limit
let new_hash_map = HashMap::from([(ratelimit_config.selector, limit)]);
new_ratelimit_map
.datastore
.insert(ratelimit_config.model, new_hash_map);
}
}
}
new_ratelimit_map
}
#[allow(unused)]
pub fn check_limit(
&self,
provider: String,
selector: Header,
tokens_used: NonZeroU32,
) -> Result<(), Error> {
debug!(
"Checking limit for provider={}, with selector={:?}, consuming tokens={:?}",
provider, selector, tokens_used
);
let provider_limits = match self.datastore.get(&provider) {
None => {
// No limit configured for this provider, hence ok.
return Ok(());
}
Some(limit) => limit,
};
let mut config_selector = configuration::Header::from(selector.clone());
let (limit, limit_key) = match provider_limits.get(&config_selector) {
// This is a specific limit, i.e one that was configured with both key, and value.
// Therefore, the key for the internal limit does not matter, and hence the empty string is always returned.
Some(limit) => (limit, String::from("")),
None => {
// Unwrap is ok here because we _know_ the value exists.
let header_key = config_selector.value.take().unwrap();
// Search for less specific limit, i.e, one that was configured without a value, therefore every Header
// value has its own key in the internal limit.
match provider_limits.get(&config_selector) {
Some(limit) => (limit, header_key),
// No limit for that header key, value pair exists within that provider limits.
None => {
return Ok(());
}
}
}
};
match limit.check_key_n(&limit_key, tokens_used) {
Ok(Ok(())) => Ok(()),
Ok(Err(_)) | Err(InsufficientCapacity(_)) => Err(Error::ExceededLimit {
provider,
selector,
tokens_used,
}),
}
}
}
fn get_quota(limit: Limit) -> Quota {
let tokens = NonZero::new(limit.tokens).expect("Limit's tokens must be positive");
match limit.unit {
TimeUnit::Second => Quota::per_second(tokens),
TimeUnit::Minute => Quota::per_minute(tokens),
TimeUnit::Hour => Quota::per_hour(tokens),
}
}
// The following tests are inside the ratelimit module in order to access RatelimitMap::new() in order to provide
// different configuration values per test.
#[test]
fn non_existent_provider_is_ok() {
let ratelimits_config = vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("only-key"),
value: None,
},
limit: Limit {
tokens: 100,
unit: TimeUnit::Minute,
},
}];
let ratelimits = RatelimitMap::new(ratelimits_config);
assert!(ratelimits
.check_limit(
String::from("non-existent-provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(5000).unwrap(),
)
.is_ok())
}
#[test]
fn non_existent_key_is_ok() {
let ratelimits_config = vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("only-key"),
value: None,
},
limit: Limit {
tokens: 100,
unit: TimeUnit::Minute,
},
}];
let ratelimits = RatelimitMap::new(ratelimits_config);
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(5000).unwrap(),
)
.is_ok())
}
#[test]
fn specific_limit_does_not_catch_non_specific_value() {
let ratelimits_config = vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("key"),
value: Some(String::from("value")),
},
limit: Limit {
tokens: 200,
unit: TimeUnit::Second,
},
}];
let ratelimits = RatelimitMap::new(ratelimits_config);
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("key"),
value: String::from("not-the-correct-value"),
},
NonZero::new(5000).unwrap(),
)
.is_ok())
}
#[test]
fn specific_limit_is_hit() {
let ratelimits_config = vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("key"),
value: Some(String::from("value")),
},
limit: Limit {
tokens: 200,
unit: TimeUnit::Hour,
},
}];
let ratelimits = RatelimitMap::new(ratelimits_config);
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(5000).unwrap(),
)
.is_err())
}
#[test]
fn non_specific_key_has_different_limits_for_different_values() {
let ratelimits_config = vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("only-key"),
value: None,
},
limit: Limit {
tokens: 100,
unit: TimeUnit::Hour,
},
}];
let ratelimits = RatelimitMap::new(ratelimits_config);
// Value1 takes 50.
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("only-key"),
value: String::from("value1"),
},
NonZero::new(50).unwrap(),
)
.is_ok());
// value2 takes 60 because it has its own 100 limit
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("only-key"),
value: String::from("value2"),
},
NonZero::new(60).unwrap(),
)
.is_ok());
// However value1 cannot take more than 100 per hour which 50+70 = 120
assert!(ratelimits
.check_limit(
String::from("provider"),
Header {
key: String::from("only-key"),
value: String::from("value1"),
},
NonZero::new(70).unwrap(),
)
.is_err())
}
#[test]
fn different_provider_can_have_different_limits_with_the_same_keys() {
let ratelimits_config = vec![
Ratelimit {
model: String::from("first_provider"),
selector: configuration::Header {
key: String::from("key"),
value: Some(String::from("value")),
},
limit: Limit {
tokens: 100,
unit: TimeUnit::Hour,
},
},
Ratelimit {
model: String::from("second_provider"),
selector: configuration::Header {
key: String::from("key"),
value: Some(String::from("value")),
},
limit: Limit {
tokens: 200,
unit: TimeUnit::Hour,
},
},
];
let ratelimits = RatelimitMap::new(ratelimits_config);
assert!(ratelimits
.check_limit(
String::from("first_provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(100).unwrap(),
)
.is_ok());
assert!(ratelimits
.check_limit(
String::from("second_provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(200).unwrap(),
)
.is_ok());
assert!(ratelimits
.check_limit(
String::from("first_provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(1).unwrap(),
)
.is_err());
assert!(ratelimits
.check_limit(
String::from("second_provider"),
Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(1).unwrap(),
)
.is_err());
}
// These tests use the publicly exposed static singleton, thus the same configuration is used in every test.
// If more tests are written here, move the initial call out of the test.
#[cfg(test)]
mod test {
use super::ratelimits;
use configuration::{Limit, Ratelimit, TimeUnit};
use public_types::configuration;
use std::num::NonZero;
use std::thread;
#[test]
fn make_ratelimits_optional() {
let ratelimits_config = Vec::new();
// Initialize in the main thread.
ratelimits(Some(ratelimits_config));
}
#[test]
fn different_threads_have_same_ratelimit_data_structure() {
let ratelimits_config = Some(vec![Ratelimit {
model: String::from("provider"),
selector: configuration::Header {
key: String::from("key"),
value: Some(String::from("value")),
},
limit: Limit {
tokens: 200,
unit: TimeUnit::Hour,
},
}]);
// Initialize in the main thread.
ratelimits(ratelimits_config);
// Use the singleton in a different thread.
thread::spawn(|| {
let ratelimits = ratelimits(None);
assert!(ratelimits
.read()
.unwrap()
.check_limit(
String::from("provider"),
super::Header {
key: String::from("key"),
value: String::from("value"),
},
NonZero::new(5000).unwrap(),
)
.is_err())
});
}
}

View file

@ -1,50 +0,0 @@
use std::rc::Rc;
use crate::llm_providers::LlmProviders;
use log::debug;
use public_types::configuration::LlmProvider;
use rand::{seq::IteratorRandom, thread_rng};
#[derive(Debug)]
pub enum ProviderHint {
Default,
Name(String),
}
impl From<String> for ProviderHint {
fn from(value: String) -> Self {
match value.as_str() {
"default" => ProviderHint::Default,
_ => ProviderHint::Name(value),
}
}
}
pub fn get_llm_provider(
llm_providers: &LlmProviders,
provider_hint: Option<ProviderHint>,
) -> Rc<LlmProvider> {
let maybe_provider = provider_hint.and_then(|hint| match hint {
ProviderHint::Default => llm_providers.default(),
// FIXME: should a non-existent name in the hint be more explicit? i.e, return a BAD_REQUEST?
ProviderHint::Name(name) => llm_providers.get(&name),
});
if let Some(provider) = maybe_provider {
return provider;
}
if llm_providers.default().is_some() {
debug!("no llm provider found for hint, using default llm provider");
return llm_providers.default().unwrap();
}
debug!("no default llm found, using random llm provider");
let mut rng = thread_rng();
llm_providers
.iter()
.choose(&mut rng)
.expect("There should always be at least one llm provider")
.1
.clone()
}

View file

@ -1,103 +0,0 @@
use log::error;
use proxy_wasm::hostcalls;
use proxy_wasm::types::*;
#[allow(unused)]
pub trait Metric {
fn id(&self) -> u32;
fn value(&self) -> Result<u64, String> {
match hostcalls::get_metric(self.id()) {
Ok(value) => Ok(value),
Err(Status::NotFound) => Err(format!("metric not found: {}", self.id())),
Err(err) => Err(format!("unexpected status: {:?}", err)),
}
}
}
#[allow(unused)]
pub trait IncrementingMetric: Metric {
fn increment(&self, offset: i64) {
match hostcalls::increment_metric(self.id(), offset) {
Ok(_) => (),
Err(err) => error!("error incrementing metric: {:?}", err),
}
}
}
#[allow(unused)]
pub trait RecordingMetric: Metric {
fn record(&self, value: u64) {
match hostcalls::record_metric(self.id(), value) {
Ok(_) => (),
Err(err) => error!("error recording metric: {:?}", err),
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct Counter {
id: u32,
}
#[allow(unused)]
impl Counter {
pub fn new(name: String) -> Counter {
let returned_id = hostcalls::define_metric(MetricType::Counter, &name)
.expect("failed to define counter '{}', name");
Counter { id: returned_id }
}
}
impl Metric for Counter {
fn id(&self) -> u32 {
self.id
}
}
impl IncrementingMetric for Counter {}
#[derive(Copy, Clone, Debug)]
pub struct Gauge {
id: u32,
}
impl Gauge {
pub fn new(name: String) -> Gauge {
let returned_id = hostcalls::define_metric(MetricType::Gauge, &name)
.expect("failed to define gauge '{}', name");
Gauge { id: returned_id }
}
}
impl Metric for Gauge {
fn id(&self) -> u32 {
self.id
}
}
/// For state of the world updates
impl RecordingMetric for Gauge {}
/// For offset deltas
impl IncrementingMetric for Gauge {}
#[derive(Copy, Clone)]
pub struct Histogram {
id: u32,
}
#[allow(unused)]
impl Histogram {
pub fn new(name: String) -> Histogram {
let returned_id = hostcalls::define_metric(MetricType::Histogram, &name)
.expect("failed to define histogram '{}', name");
Histogram { id: returned_id }
}
}
impl Metric for Histogram {
fn id(&self) -> u32 {
self.id
}
}
impl RecordingMetric for Histogram {}

File diff suppressed because it is too large Load diff

View file

@ -1,39 +0,0 @@
use log::debug;
#[derive(Debug, PartialEq, Eq)]
#[allow(dead_code)]
pub enum Error {
UnknownModel,
FailedToTokenize,
}
#[allow(dead_code)]
pub fn token_count(model_name: &str, text: &str) -> Result<usize, Error> {
debug!("getting token count model={}", model_name);
// Consideration: is it more expensive to instantiate the BPE object every time, or to contend the singleton?
let bpe = tiktoken_rs::get_bpe_from_model(model_name).map_err(|_| Error::UnknownModel)?;
Ok(bpe.encode_ordinary(text).len())
}
#[cfg(test)]
mod test {
use super::*;
#[test]
fn encode_ordinary() {
let model_name = "gpt-3.5-turbo";
let text = "How many tokens does this sentence have?";
assert_eq!(
8,
token_count(model_name, text).expect("correct tokenization")
);
}
#[test]
fn unrecognized_model() {
assert_eq!(
Error::UnknownModel,
token_count("unknown", "").expect_err("unknown model")
)
}
}

View file

@ -1,805 +0,0 @@
use http::StatusCode;
use proxy_wasm_test_framework::tester::{self, Tester};
use proxy_wasm_test_framework::types::{
Action, BufferType, LogLevel, MapType, MetricType, ReturnType,
};
use public_types::common_types::open_ai::{ChatCompletionsResponse, Choice, Message, Usage};
use public_types::common_types::open_ai::{FunctionCallDetail, ToolCall, ToolType};
use public_types::common_types::{HallucinationClassificationResponse, PromptGuardResponse};
use public_types::embeddings::{
create_embedding_response, embedding, CreateEmbeddingResponse, CreateEmbeddingResponseUsage,
Embedding,
};
use public_types::{common_types::ZeroShotClassificationResponse, configuration::Configuration};
use serde_yaml::Value;
use serial_test::serial;
use std::collections::HashMap;
use std::path::Path;
fn wasm_module() -> String {
let wasm_file = Path::new("target/wasm32-wasi/release/intelligent_prompt_gateway.wasm");
assert!(
wasm_file.exists(),
"Run `cargo build --release --target=wasm32-wasi` first"
);
wasm_file.to_str().unwrap().to_string()
}
fn request_headers_expectations(module: &mut Tester, http_context: i32) {
module
.call_proxy_on_request_headers(http_context, 0, false)
.expect_get_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-llm-provider-hint"),
)
.returning(Some("default"))
.expect_log(Some(LogLevel::Debug), None)
.expect_add_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-upstream"),
Some("arch_llm_listener"),
)
.expect_add_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-llm-provider"),
Some("open-ai-gpt-4"),
)
.expect_replace_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("Authorization"),
Some("Bearer secret_key"),
)
.expect_remove_header_map_value(Some(MapType::HttpRequestHeaders), Some("content-length"))
.expect_get_header_map_value(
Some(MapType::HttpRequestHeaders),
Some("x-arch-ratelimit-selector"),
)
.returning(Some("selector-key"))
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("selector-key"))
.returning(Some("selector-value"))
.expect_get_header_map_pairs(Some(MapType::HttpRequestHeaders))
.returning(None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":path"))
.returning(Some("/v1/chat/completions"))
.expect_log(Some(LogLevel::Debug), None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("x-request-id"))
.returning(None)
.execute_and_expect(ReturnType::Action(Action::Continue))
.unwrap();
}
fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
request_headers_expectations(module, http_context);
// Request Body
let chat_completions_request_body = "\
{\
\"messages\": [\
{\
\"role\": \"system\",\
\"content\": \"You are a poetic assistant, skilled in explaining complex programming concepts with creative flair.\"\
},\
{\
\"role\": \"user\",\
\"content\": \"Compose a poem that explains the concept of recursion in programming.\"\
}\
],\
\"model\": \"gpt-4\"\
}";
module
.call_proxy_on_request_body(
http_context,
chat_completions_request_body.len() as i32,
true,
)
.expect_get_buffer_bytes(Some(BufferType::HttpRequestBody))
.returning(Some(chat_completions_request_body))
// The actual call is not important in this test, we just need to grab the token_id
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/guard"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(1))
.expect_log(Some(LogLevel::Debug), None)
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::Action(Action::Pause))
.unwrap();
let prompt_guard_response = PromptGuardResponse {
toxic_prob: None,
toxic_verdict: None,
jailbreak_prob: None,
jailbreak_verdict: None,
};
let prompt_guard_response_buffer = serde_json::to_string(&prompt_guard_response).unwrap();
module
.call_proxy_on_http_call_response(
http_context,
1,
0,
prompt_guard_response_buffer.len() as i32,
0,
)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&prompt_guard_response_buffer))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(2))
.expect_metric_increment("active_http_calls", 1)
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
let embedding_response = CreateEmbeddingResponse {
data: vec![Embedding {
index: 0,
embedding: vec![],
object: embedding::Object::default(),
}],
model: String::from("test"),
object: create_embedding_response::Object::default(),
usage: Box::new(CreateEmbeddingResponseUsage::new(0, 0)),
};
let embeddings_response_buffer = serde_json::to_string(&embedding_response).unwrap();
module
.call_proxy_on_http_call_response(
http_context,
2,
0,
embeddings_response_buffer.len() as i32,
0,
)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&embeddings_response_buffer))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/zeroshot"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(3))
.expect_metric_increment("active_http_calls", 1)
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
let zero_shot_response = ZeroShotClassificationResponse {
predicted_class: "weather_forecast".to_string(),
predicted_class_score: 0.1,
scores: HashMap::new(),
model: "test-model".to_string(),
};
let zeroshot_intent_detection_buffer = serde_json::to_string(&zero_shot_response).unwrap();
module
.call_proxy_on_http_call_response(
http_context,
3,
0,
zeroshot_intent_detection_buffer.len() as i32,
0,
)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&zeroshot_intent_detection_buffer))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Info), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
(":method", "POST"),
("x-arch-upstream", "arch_fc"),
(":path", "/v1/chat/completions"),
(":authority", "arch_fc"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "120000"),
]),
None,
None,
None,
)
.returning(Some(4))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
}
fn setup_filter(module: &mut Tester, config: &str) -> i32 {
let filter_context = 1;
module
.call_proxy_on_context_create(filter_context, 0)
.expect_metric_creation(MetricType::Gauge, "active_http_calls")
.expect_metric_creation(MetricType::Counter, "ratelimited_rq")
.execute_and_expect(ReturnType::None)
.unwrap();
module
.call_proxy_on_configure(filter_context, config.len() as i32)
.expect_get_buffer_bytes(Some(BufferType::PluginConfiguration))
.returning(Some(config))
.execute_and_expect(ReturnType::Bool(true))
.unwrap();
module
.call_proxy_on_tick(filter_context)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(101))
.expect_metric_increment("active_http_calls", 1)
.expect_set_tick_period_millis(Some(0))
.execute_and_expect(ReturnType::None)
.unwrap();
let embedding_response = CreateEmbeddingResponse {
data: vec![Embedding {
embedding: vec![],
index: 0,
object: embedding::Object::default(),
}],
model: String::from("test"),
object: create_embedding_response::Object::default(),
usage: Box::new(CreateEmbeddingResponseUsage {
prompt_tokens: 0,
total_tokens: 0,
}),
};
let embedding_response_str = serde_json::to_string(&embedding_response).unwrap();
module
.call_proxy_on_http_call_response(
filter_context,
101,
0,
embedding_response_str.len() as i32,
0,
)
.expect_log(
Some(LogLevel::Debug),
Some(
format!(
"filter_context: on_http_call_response called with token_id: {:?}",
101
)
.as_str(),
),
)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&embedding_response_str))
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
filter_context
}
fn default_config() -> &'static str {
r#"
version: "0.1-beta"
listener:
address: 0.0.0.0
port: 10000
message_format: huggingface
connect_timeout: 0.005s
endpoints:
api_server:
endpoint: api_server:80
connect_timeout: 0.005s
llm_providers:
- name: open-ai-gpt-4
provider: openai
access_key: secret_key
model: gpt-4
default: true
overrides:
# confidence threshold for prompt target intent matching
prompt_target_intent_matching_threshold: 0.6
system_prompt: |
You are a helpful assistant.
prompt_guards:
input_guards:
jailbreak:
on_exception:
message: "Looks like you're curious about my abilities, but I can only provide assistance within my programmed parameters."
prompt_targets:
- name: weather_forecast
description: This function provides realtime weather forecast information for a given city.
parameters:
- name: city
required: true
description: The city for which the weather forecast is requested.
- name: days
description: The number of days for which the weather forecast is requested.
- name: units
description: The units in which the weather forecast is requested.
endpoint:
name: api_server
path: /weather
system_prompt: |
You are a helpful weather forecaster. Use weater data that is provided to you. Please following following guidelines when responding to user queries:
- Use farenheight for temperature
- Use miles per hour for wind speed
ratelimits:
- model: gpt-4
selector:
key: selector-key
value: selector-value
limit:
tokens: 1
unit: minute
"#
}
#[test]
#[serial]
fn successful_request_to_open_ai_chat_completions() {
let args = tester::MockSettings {
wasm_path: wasm_module(),
quiet: false,
allow_unexpected: false,
};
let mut module = tester::mock(args).unwrap();
module
.call_start()
.execute_and_expect(ReturnType::None)
.unwrap();
// Setup Filter
let filter_context = setup_filter(&mut module, default_config());
// Setup HTTP Stream
let http_context = 2;
module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
request_headers_expectations(&mut module, http_context);
// Request Body
let chat_completions_request_body = "\
{\
\"messages\": [\
{\
\"role\": \"system\",\
\"content\": \"You are a poetic assistant, skilled in explaining complex programming concepts with creative flair.\"\
},\
{\
\"role\": \"user\",\
\"content\": \"Compose a poem that explains the concept of recursion in programming.\"\
}\
],\
\"model\": \"gpt-4\"\
}";
module
.call_proxy_on_request_body(
http_context,
chat_completions_request_body.len() as i32,
true,
)
.expect_get_buffer_bytes(Some(BufferType::HttpRequestBody))
.returning(Some(chat_completions_request_body))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(Some("arch_internal"), None, None, None, None)
.returning(Some(4))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::Action(Action::Pause))
.unwrap();
}
#[test]
#[serial]
fn bad_request_to_open_ai_chat_completions() {
let args = tester::MockSettings {
wasm_path: wasm_module(),
quiet: false,
allow_unexpected: false,
};
let mut module = tester::mock(args).unwrap();
module
.call_start()
.execute_and_expect(ReturnType::None)
.unwrap();
// Setup Filter
let filter_context = setup_filter(&mut module, default_config());
// Setup HTTP Stream
let http_context = 2;
module
.call_proxy_on_context_create(http_context, filter_context)
.expect_log(Some(LogLevel::Debug), None)
.execute_and_expect(ReturnType::None)
.unwrap();
request_headers_expectations(&mut module, http_context);
// Request Body
let incomplete_chat_completions_request_body = "\
{\
\"messages\": [\
{\
\"role\": \"system\",\
},\
{\
\"role\": \"user\",\
\"content\": \"Compose a poem that explains the concept of recursion in programming.\"\
}\
]\
}";
module
.call_proxy_on_request_body(
http_context,
incomplete_chat_completions_request_body.len() as i32,
true,
)
.expect_get_buffer_bytes(Some(BufferType::HttpRequestBody))
.returning(Some(incomplete_chat_completions_request_body))
.expect_log(Some(LogLevel::Debug), None)
.expect_send_local_response(
Some(StatusCode::BAD_REQUEST.as_u16().into()),
None,
None,
None,
)
.execute_and_expect(ReturnType::Action(Action::Pause))
.unwrap();
}
#[test]
#[serial]
fn request_ratelimited() {
let args = tester::MockSettings {
wasm_path: wasm_module(),
quiet: false,
allow_unexpected: false,
};
let mut module = tester::mock(args).unwrap();
module
.call_start()
.execute_and_expect(ReturnType::None)
.unwrap();
// Setup Filter
let filter_context = setup_filter(&mut module, default_config());
// Setup HTTP Stream
let http_context = 2;
normal_flow(&mut module, filter_context, http_context);
let arch_fc_resp = ChatCompletionsResponse {
usage: Some(Usage {
completion_tokens: 0,
}),
choices: vec![Choice {
finish_reason: "test".to_string(),
index: 0,
message: Message {
role: "system".to_string(),
content: None,
tool_calls: Some(vec![ToolCall {
id: String::from("test"),
tool_type: ToolType::Function,
function: FunctionCallDetail {
name: String::from("weather_forecast"),
arguments: HashMap::from([(
String::from("city"),
Value::String(String::from("seattle")),
)]),
},
}]),
model: None,
},
}],
model: String::from("test"),
metadata: None,
};
let arch_fc_resp_str = serde_json::to_string(&arch_fc_resp).unwrap();
module
.call_proxy_on_http_call_response(http_context, 4, 0, arch_fc_resp_str.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&arch_fc_resp_str))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/hallucination"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(5))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
let hallucatination_body = HallucinationClassificationResponse {
params_scores: HashMap::from([("city".to_string(), 0.99)]),
model: "nli-model".to_string(),
};
let body_text = serde_json::to_string(&hallucatination_body).unwrap();
module
.call_proxy_on_http_call_response(http_context, 5, 0, body_text.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&body_text))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "api_server"),
(":method", "POST"),
(":path", "/weather"),
(":authority", "api_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
]),
None,
None,
None,
)
.returning(Some(6))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
let body_text = String::from("test body");
module
.call_proxy_on_http_call_response(http_context, 6, 0, body_text.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&body_text))
.expect_get_header_map_value(Some(MapType::HttpCallResponseHeaders), Some(":status"))
.returning(Some("200"))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_send_local_response(
Some(StatusCode::TOO_MANY_REQUESTS.as_u16().into()),
None,
None,
None,
)
.expect_metric_increment("ratelimited_rq", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
}
#[test]
#[serial]
fn request_not_ratelimited() {
let args = tester::MockSettings {
wasm_path: wasm_module(),
quiet: false,
allow_unexpected: false,
};
let mut module = tester::mock(args).unwrap();
module
.call_start()
.execute_and_expect(ReturnType::None)
.unwrap();
// Setup Filter
let mut config: Configuration = serde_yaml::from_str(default_config()).unwrap();
config.ratelimits.as_mut().unwrap()[0].limit.tokens += 1000;
let config_str = serde_json::to_string(&config).unwrap();
let filter_context = setup_filter(&mut module, &config_str);
// Setup HTTP Stream
let http_context = 2;
normal_flow(&mut module, filter_context, http_context);
let arch_fc_resp = ChatCompletionsResponse {
usage: Some(Usage {
completion_tokens: 0,
}),
choices: vec![Choice {
finish_reason: "test".to_string(),
index: 0,
message: Message {
role: "system".to_string(),
content: None,
tool_calls: Some(vec![ToolCall {
id: String::from("test"),
tool_type: ToolType::Function,
function: FunctionCallDetail {
name: String::from("weather_forecast"),
arguments: HashMap::from([(
String::from("city"),
Value::String(String::from("seattle")),
)]),
},
}]),
model: None,
},
}],
model: String::from("test"),
metadata: None,
};
let arch_fc_resp_str = serde_json::to_string(&arch_fc_resp).unwrap();
module
.call_proxy_on_http_call_response(http_context, 4, 0, arch_fc_resp_str.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&arch_fc_resp_str))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/hallucination"),
(":authority", "model_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
]),
None,
None,
None,
)
.returning(Some(5))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
// hallucination should return that parameters were not halliucinated
// prompt: str
// parameters: dict
// model: str
let hallucatination_body = HallucinationClassificationResponse {
params_scores: HashMap::from([("city".to_string(), 0.99)]),
model: "nli-model".to_string(),
};
let body_text = serde_json::to_string(&hallucatination_body).unwrap();
module
.call_proxy_on_http_call_response(http_context, 5, 0, body_text.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&body_text))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "api_server"),
(":method", "POST"),
(":path", "/weather"),
(":authority", "api_server"),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
]),
None,
None,
None,
)
.returning(Some(6))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::None)
.unwrap();
let body_text = String::from("test body");
module
.call_proxy_on_http_call_response(http_context, 6, 0, body_text.len() as i32, 0)
.expect_metric_increment("active_http_calls", -1)
.expect_get_buffer_bytes(Some(BufferType::HttpCallResponseBody))
.returning(Some(&body_text))
.expect_get_header_map_value(Some(MapType::HttpCallResponseHeaders), Some(":status"))
.returning(Some("200"))
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_set_buffer_bytes(Some(BufferType::HttpRequestBody), None)
.execute_and_expect(ReturnType::None)
.unwrap();
}