Split listener (#141)

This commit is contained in:
Adil Hafeez 2024-10-08 16:24:08 -07:00 committed by GitHub
parent 22bc3d2798
commit 285aa1419b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 449 additions and 335 deletions

1
.gitignore vendored
View file

@ -29,3 +29,4 @@ model_server/model_server.egg-info
model_server/venv_model_server
model_server/build
model_server/dist
arch_logs/

View file

@ -144,6 +144,12 @@ properties:
- model
- selector
- limit
tracing:
type: object
properties:
random_sampling:
type: integer
additionalProperties: false
additionalProperties: false
required:
- version

View file

@ -1,3 +0,0 @@
RUST_VERSION=1.80.0
docker run --rm -v rustup_cache:/usr/local/rustup/ rust:$RUST_VERSION rustup -v target add wasm32-wasi
docker run --rm -v $PWD/../open-message-format:/code/open-message-format -v ~/.cargo:/root/.cargo -v $(pwd):/code/arch -w /code/arch -v rustup_cache:/usr/local/rustup/ rust:$RUST_VERSION cargo build --release --target wasm32-wasi

View file

@ -0,0 +1 @@
docker build -t archgw .. -f Dockerfile

View file

@ -3,11 +3,15 @@ services:
image: archgw:latest
ports:
- "10000:10000"
- "11000:11000"
- "19901:9901"
volumes:
- ${ARCH_CONFIG_FILE:-../demos/function_calling/arch_config.yaml}:/config/arch_config.yaml
- /etc/ssl/cert.pem:/etc/ssl/cert.pem
- ./envoy.template.dev.yaml:/config/envoy.template.yaml
- ./envoy.template.yaml:/config/envoy.template.yaml
- ./target/wasm32-wasi/release/intelligent_prompt_gateway.wasm:/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm
- ./arch_config_schema.yaml:/config/arch_config_schema.yaml
- ./tools/config_generator.py:/config/config_generator.py
- ./arch_logs:/var/log/
env_file:
- stage.env

View file

@ -1,179 +0,0 @@
admin:
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
address:
socket_address:
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: arch_ingress_http
codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
access_log:
- name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/arch_access.log"
route_config:
name: local_routes
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
- match:
prefix: "/mistral/v1/chat/completions"
route:
auto_host_rewrite: true
cluster: mistral_7b_instruct
timeout: 60s
{% for provider in arch_llm_providers %}
- match:
prefix: "/"
headers:
- name: "x-arch-llm-provider"
string_match:
exact: {{ provider.name }}
route:
auto_host_rewrite: true
cluster: {{ provider.provider }}
timeout: 60s
{% endfor %}
http_filters:
- name: envoy.filters.http.wasm
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
value:
config:
name: "http_config"
configuration:
"@type": "type.googleapis.com/google.protobuf.StringValue"
value: |
{{ arch_config | indent(30) }}
vm_config:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm"
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: openai
connect_timeout: 5s
type: LOGICAL_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: openai
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: api.openai.com
port_value: 443
hostname: "api.openai.com"
transport_socket:
name: envoy.transport_sockets.tls
typed_config:
"@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext
sni: api.openai.com
common_tls_context:
tls_params:
tls_minimum_protocol_version: TLSv1_2
tls_maximum_protocol_version: TLSv1_3
- name: mistral
connect_timeout: 5s
type: LOGICAL_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: mistral
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: api.mistral.ai
port_value: 443
hostname: "api.mistral.ai"
transport_socket:
name: envoy.transport_sockets.tls
typed_config:
"@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext
sni: api.mistral.ai
- name: model_server
connect_timeout: 5s
type: STRICT_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: model_server
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 51000
hostname: "model_server"
- name: mistral_7b_instruct
connect_timeout: 5s
type: STRICT_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: mistral_7b_instruct
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: mistral_7b_instruct
port_value: 10001
hostname: "mistral_7b_instruct"
- name: arch_fc
connect_timeout: 5s
type: STRICT_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: arch_fc
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 51000
hostname: "arch_fc"
{% for _, cluster in arch_clusters.items() %}
- name: {{ cluster.name }}
{% if cluster.connect_timeout -%}
connect_timeout: {{ cluster.connect_timeout }}
{% else -%}
connect_timeout: 5s
{% endif -%}
type: LOGICAL_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: {{ cluster.name }}
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: {{ cluster.endpoint }}
port_value: {{ cluster.port }}
hostname: {{ cluster.name }}
{% endfor %}

View file

@ -3,69 +3,165 @@ admin:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
address:
socket_address:
address: 0.0.0.0
port_value: 10000
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: arch_ingress_http
codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
access_log:
- name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/arch_access.log"
route_config:
name: local_routes
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
- match:
prefix: "/mistral/v1/chat/completions"
route:
auto_host_rewrite: true
cluster: mistral_7b_instruct
timeout: 60s
{% for provider in arch_llm_providers %}
- match:
prefix: "/"
headers:
- name: "x-arch-llm-provider"
string_match:
exact: {{ provider.name }}
route:
auto_host_rewrite: true
cluster: {{ provider.provider }}
timeout: 60s
{% endfor %}
http_filters:
- name: envoy.filters.http.wasm
- name: arch_listener_http
address:
socket_address:
address: 0.0.0.0
port_value: 10000
traffic_direction: INBOUND
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
{% if arch_tracing.random_sampling > 0 %}
generate_request_id: true
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: arch
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
stat_prefix: arch_listener_http
codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
access_log:
- name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
value:
config:
name: "http_config"
configuration:
"@type": "type.googleapis.com/google.protobuf.StringValue"
value: |
{{ arch_config | indent(30) }}
vm_config:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm"
- name: envoy.filters.http.router
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/arch_access.log"
route_config:
name: local_routes
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
{% for provider in arch_llm_providers %}
- match:
prefix: "/"
headers:
- name: "x-arch-llm-provider"
string_match:
exact: {{ provider.name }}
route:
auto_host_rewrite: true
cluster: {{ provider.provider }}
timeout: 60s
{% endfor %}
- match:
prefix: "/"
direct_response:
status: 400
body:
inline_string: "x-arch-llm-provider header not set, cannot perform routing\n"
http_filters:
- name: envoy.filters.http.wasm
typed_config:
"@type": type.googleapis.com/udpa.type.v1.TypedStruct
type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm
value:
config:
name: "http_config"
configuration:
"@type": "type.googleapis.com/google.protobuf.StringValue"
value: |
{{ arch_config | indent(32) }}
vm_config:
runtime: "envoy.wasm.runtime.v8"
code:
local:
filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm"
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
- name: arch_internal
address:
socket_address:
address: 0.0.0.0
port_value: 11000
traffic_direction: OUTBOUND
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
{% if arch_tracing.random_sampling > 0 %}
generate_request_id: true
tracing:
provider:
name: envoy.tracers.opentelemetry
typed_config:
"@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig
grpc_service:
envoy_grpc:
cluster_name: opentelemetry_collector
timeout: 0.250s
service_name: arch
random_sampling:
value: {{ arch_tracing.random_sampling }}
{% endif %}
stat_prefix: arch_internal
codec_type: AUTO
scheme_header_transformation:
scheme_to_overwrite: https
access_log:
- name: envoy.access_loggers.file
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
"@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog
path: "/var/log/arch_access_internal.log"
route_config:
name: local_routes
virtual_hosts:
- name: local_service
domains:
- "*"
routes:
- match:
prefix: "/"
headers:
- name: "x-arch-upstream"
string_match:
exact: model_server
route:
auto_host_rewrite: true
cluster: model_server
timeout: 60s
- match:
prefix: "/"
headers:
- name: "x-arch-upstream"
string_match:
exact: arch_fc
route:
auto_host_rewrite: true
cluster: model_server
timeout: 60s
{% for _, cluster in arch_clusters.items() %}
- match:
prefix: "/"
headers:
- name: "x-arch-upstream"
string_match:
exact: {{ cluster.name }}
route:
auto_host_rewrite: true
cluster: {{ cluster.name }}
timeout: 60s
{% endfor %}
http_filters:
- name: envoy.filters.http.router
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router
clusters:
- name: openai
connect_timeout: 5s
@ -177,3 +273,39 @@ static_resources:
port_value: {{ cluster.port }}
hostname: {{ cluster.name }}
{% endfor %}
- name: arch_internal
connect_timeout: 5s
type: LOGICAL_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: arch_internal
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 0.0.0.0
port_value: 11000
hostname: arch_internal
{% if arch_tracing.random_sampling > 0 %}
- name: opentelemetry_collector
type: STRICT_DNS
dns_lookup_family: V4_ONLY
lb_policy: ROUND_ROBIN
typed_extension_protocol_options:
envoy.extensions.upstreams.http.v3.HttpProtocolOptions:
"@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions
explicit_http_config:
http2_protocol_options: {}
load_assignment:
cluster_name: opentelemetry_collector
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: host.docker.internal
port_value: 4317
{% endif %}

View file

@ -15,3 +15,6 @@ pub const ARCH_PROVIDER_HINT_HEADER: &str = "x-arch-llm-provider-hint";
pub const CHAT_COMPLETIONS_PATH: &str = "v1/chat/completions";
pub const ARCH_STATE_HEADER: &str = "x-arch-state";
pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function-1.5B";
pub const REQUEST_ID_HEADER: &str = "x-request-id";
pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal";
pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream";

View file

@ -1,4 +1,7 @@
use crate::consts::{DEFAULT_EMBEDDING_MODEL, MODEL_SERVER_NAME};
use crate::consts::{
ARCH_INTERNAL_CLUSTER_NAME, ARCH_UPSTREAM_HOST_HEADER, DEFAULT_EMBEDDING_MODEL,
MODEL_SERVER_NAME,
};
use crate::http::{CallArgs, Client};
use crate::llm_providers::LlmProviders;
use crate::ratelimit;
@ -98,9 +101,10 @@ impl FilterContext {
let json_data = serde_json::to_string(&embeddings_input).unwrap();
let call_args = CallArgs::new(
MODEL_SERVER_NAME,
ARCH_INTERNAL_CLUSTER_NAME,
"/embeddings",
vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", MODEL_SERVER_NAME),

View file

@ -1,9 +1,9 @@
use crate::consts::{
ARCH_FC_MODEL_NAME, ARCH_FC_REQUEST_TIMEOUT_MS, ARCH_MESSAGES_KEY, ARCH_PROVIDER_HINT_HEADER,
ARCH_ROUTING_HEADER, ARCH_STATE_HEADER, ARC_FC_CLUSTER, CHAT_COMPLETIONS_PATH,
DEFAULT_EMBEDDING_MODEL, DEFAULT_HALLUCINATED_THRESHOLD, DEFAULT_INTENT_MODEL,
DEFAULT_PROMPT_TARGET_THRESHOLD, GPT_35_TURBO, MODEL_SERVER_NAME,
RATELIMIT_SELECTOR_HEADER_KEY, SYSTEM_ROLE, USER_ROLE,
ARCH_FC_MODEL_NAME, ARCH_FC_REQUEST_TIMEOUT_MS, ARCH_INTERNAL_CLUSTER_NAME, ARCH_MESSAGES_KEY,
ARCH_PROVIDER_HINT_HEADER, ARCH_ROUTING_HEADER, ARCH_STATE_HEADER, ARCH_UPSTREAM_HOST_HEADER,
ARC_FC_CLUSTER, CHAT_COMPLETIONS_PATH, DEFAULT_EMBEDDING_MODEL, DEFAULT_HALLUCINATED_THRESHOLD,
DEFAULT_INTENT_MODEL, DEFAULT_PROMPT_TARGET_THRESHOLD, GPT_35_TURBO, MODEL_SERVER_NAME,
RATELIMIT_SELECTOR_HEADER_KEY, REQUEST_ID_HEADER, SYSTEM_ROLE, USER_ROLE,
};
use crate::filter_context::{EmbeddingsStore, WasmMetrics};
use crate::http::{CallArgs, Client, ClientError};
@ -109,9 +109,11 @@ pub struct StreamContext {
prompt_guards: Rc<PromptGuards>,
llm_providers: Rc<LlmProviders>,
llm_provider: Option<Rc<LlmProvider>>,
request_id: Option<String>,
}
impl StreamContext {
#[allow(clippy::too_many_arguments)]
pub fn new(
context_id: u32,
metrics: Rc<WasmMetrics>,
@ -143,6 +145,7 @@ impl StreamContext {
llm_provider: None,
prompt_guards,
overrides,
request_id: None,
}
}
fn llm_provider(&self) -> &LlmProvider {
@ -292,17 +295,24 @@ impl StreamContext {
}
};
let mut headers = vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/zeroshot"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
MODEL_SERVER_NAME,
ARCH_INTERNAL_CLUSTER_NAME,
"/zeroshot",
vec![
(":method", "POST"),
(":path", "/zeroshot"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
],
headers,
Some(json_data.as_bytes()),
vec![],
Duration::from_secs(5),
@ -470,17 +480,25 @@ impl StreamContext {
debug!("no prompt target found with similarity score above threshold, using default prompt target");
let timeout_str = ARCH_FC_REQUEST_TIMEOUT_MS.to_string();
let mut headers = vec![
(":method", "POST"),
(ARCH_UPSTREAM_HOST_HEADER, &upstream_endpoint),
(":path", &upstream_path),
(":authority", &upstream_endpoint),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
&upstream_endpoint,
ARCH_INTERNAL_CLUSTER_NAME,
&upstream_path,
vec![
(":method", "POST"),
(":path", &upstream_path),
(":authority", &upstream_endpoint),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
],
headers,
Some(arch_messages_json.as_bytes()),
vec![],
Duration::from_secs(5),
@ -578,17 +596,25 @@ impl StreamContext {
};
let timeout_str = ARCH_FC_REQUEST_TIMEOUT_MS.to_string();
let mut headers = vec![
(":method", "POST"),
(ARCH_UPSTREAM_HOST_HEADER, ARC_FC_CLUSTER),
(":path", "/v1/chat/completions"),
(":authority", ARC_FC_CLUSTER),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
ARC_FC_CLUSTER,
ARCH_INTERNAL_CLUSTER_NAME,
"/v1/chat/completions",
vec![
(":method", "POST"),
(":path", "/v1/chat/completions"),
(":authority", ARC_FC_CLUSTER),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()),
],
headers,
Some(msg_body.as_bytes()),
vec![],
Duration::from_secs(5),
@ -693,17 +719,25 @@ impl StreamContext {
return self.send_server_error(ServerError::Serialization(error), None);
}
};
let mut headers = vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/hallucination"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
MODEL_SERVER_NAME,
ARCH_INTERNAL_CLUSTER_NAME,
"/hallucination",
vec![
(":method", "POST"),
(":path", "/hallucination"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
],
headers,
Some(json_data.as_bytes()),
vec![],
Duration::from_secs(5),
@ -740,16 +774,24 @@ impl StreamContext {
let endpoint = prompt_target.endpoint.unwrap();
let path: String = endpoint.path.unwrap_or(String::from("/"));
let mut headers = vec![
(ARCH_UPSTREAM_HOST_HEADER, endpoint.name.as_str()),
(":method", "POST"),
(":path", &path),
(":authority", endpoint.name.as_str()),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
&endpoint.name,
ARCH_INTERNAL_CLUSTER_NAME,
&path,
vec![
(":method", "POST"),
(":path", &path),
(":authority", endpoint.name.as_str()),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
],
headers,
Some(tool_params_json_str.as_bytes()),
vec![],
Duration::from_secs(5),
@ -799,10 +841,7 @@ impl StreamContext {
// add system prompt
let system_prompt = match prompt_target.system_prompt.as_ref() {
None => match self.system_prompt.as_ref() {
None => None,
Some(system_prompt) => Some(system_prompt.clone()),
},
None => self.system_prompt.as_ref().clone(),
Some(system_prompt) => Some(system_prompt.clone()),
};
if system_prompt.is_some() {
@ -927,17 +966,22 @@ impl StreamContext {
}
};
let mut headers = vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
MODEL_SERVER_NAME,
ARCH_INTERNAL_CLUSTER_NAME,
"/embeddings",
vec![
(":method", "POST"),
(":path", "/embeddings"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
],
headers,
Some(json_data.as_bytes()),
vec![],
Duration::from_secs(5),
@ -1054,6 +1098,8 @@ impl HttpContext for StreamContext {
self.get_http_request_headers()
);
self.request_id = self.get_http_request_header(REQUEST_ID_HEADER);
Action::Continue
}
@ -1180,17 +1226,24 @@ impl HttpContext for StreamContext {
}
};
let mut headers = vec![
(ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME),
(":method", "POST"),
(":path", "/guard"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
];
if self.request_id.is_some() {
headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap()));
}
let call_args = CallArgs::new(
MODEL_SERVER_NAME,
ARCH_INTERNAL_CLUSTER_NAME,
"/guard",
vec![
(":method", "POST"),
(":path", "/guard"),
(":authority", MODEL_SERVER_NAME),
("content-type", "application/json"),
("x-envoy-max-retries", "3"),
("x-envoy-upstream-rq-timeout-ms", "60000"),
],
headers,
Some(json_data.as_bytes()),
vec![],
Duration::from_secs(5),
@ -1286,6 +1339,7 @@ impl HttpContext for StreamContext {
match serde_json::from_slice(&body) {
Ok(de) => de,
Err(e) => {
debug!("invalid response: {}", String::from_utf8_lossy(&body));
self.send_server_error(ServerError::Deserialization(e), None);
return Action::Pause;
}

View file

@ -56,6 +56,8 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) {
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":path"))
.returning(Some("/v1/chat/completions"))
.expect_log(Some(LogLevel::Debug), None)
.expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("x-request-id"))
.returning(None)
.execute_and_expect(ReturnType::Action(Action::Continue))
.unwrap();
}
@ -95,8 +97,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
.returning(Some(chat_completions_request_body))
// The actual call is not important in this test, we just need to grab the token_id
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/guard"),
(":authority", "model_server"),
@ -135,8 +138,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", "model_server"),
@ -179,8 +183,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/zeroshot"),
(":authority", "model_server"),
@ -220,9 +225,10 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) {
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Info), None)
.expect_http_call(
Some("arch_fc"),
Some("arch_internal"),
Some(vec![
(":method", "POST"),
("x-arch-upstream", "arch_fc"),
(":path", "/v1/chat/completions"),
(":authority", "arch_fc"),
("content-type", "application/json"),
@ -262,8 +268,9 @@ fn setup_filter(module: &mut Tester, config: &str) -> i32 {
.call_proxy_on_tick(filter_context)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/embeddings"),
(":authority", "model_server"),
@ -441,7 +448,7 @@ fn successful_request_to_open_ai_chat_completions() {
.expect_get_buffer_bytes(Some(BufferType::HttpRequestBody))
.returning(Some(chat_completions_request_body))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(Some("model_server"), None, None, None, None)
.expect_http_call(Some("arch_internal"), None, None, None, None)
.returning(Some(4))
.expect_metric_increment("active_http_calls", 1)
.execute_and_expect(ReturnType::Action(Action::Pause))
@ -573,8 +580,9 @@ fn request_ratelimited() {
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/hallucination"),
(":authority", "model_server"),
@ -605,8 +613,9 @@ fn request_ratelimited() {
.returning(Some(&body_text))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("api_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "api_server"),
(":method", "POST"),
(":path", "/weather"),
(":authority", "api_server"),
@ -713,8 +722,9 @@ fn request_not_ratelimited() {
.expect_log(Some(LogLevel::Debug), None)
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("model_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "model_server"),
(":method", "POST"),
(":path", "/hallucination"),
(":authority", "model_server"),
@ -750,8 +760,9 @@ fn request_not_ratelimited() {
.returning(Some(&body_text))
.expect_log(Some(LogLevel::Debug), None)
.expect_http_call(
Some("api_server"),
Some("arch_internal"),
Some(vec![
("x-arch-upstream", "api_server"),
(":method", "POST"),
(":path", "/weather"),
(":authority", "api_server"),

View file

@ -67,12 +67,14 @@ def validate_and_render_schema():
config_yaml = add_secret_key_to_llm_providers(config_yaml)
arch_llm_providers = config_yaml["llm_providers"]
arch_tracing = config_yaml.get("tracing", {})
arch_config_string = yaml.dump(config_yaml)
data = {
'arch_config': arch_config_string,
'arch_clusters': inferred_clusters,
'arch_llm_providers': arch_llm_providers
'arch_llm_providers': arch_llm_providers,
'arch_tracing': arch_tracing
}
rendered = template.render(data)

View file

@ -1,15 +1,16 @@
FROM python:3 AS base
FROM python:3.10 AS base
FROM base AS builder
WORKDIR /src
COPY requirements.txt /src/
RUN pip install --prefix=/runtime --force-reinstall -r requirements.txt
COPY . /src
FROM python:3-slim AS output
FROM python:3.10-slim AS output
COPY --from=builder /runtime /usr/local

View file

@ -0,0 +1,11 @@
FROM alpine:3.20@sha256:beefdbd8a1da6d2915566fde36db9db0b524eb737fc57cd1367effd16dc0d06d AS otelc_curl
RUN apk --update add curl
FROM otel/opentelemetry-collector:latest@sha256:aef3e6d742fb69b94e9c0813a028449d28438bb6f9c93cb5d0b8d0704b78ae65
COPY --from=otelc_curl / /
COPY ./otel-collector-config.yaml /etc/otel-collector-config.yaml
USER 0
RUN chmod o+r /etc/otel-collector-config.yaml
USER nobody

View file

@ -80,3 +80,6 @@ ratelimits:
limit:
tokens: 1
unit: minute
tracing:
random_sampling: 100

View file

@ -21,6 +21,22 @@ services:
- MISTRAL_API_KEY=${MISTRAL_API_KEY:?error}
- CHAT_COMPLETION_ENDPOINT=http://host.docker.internal:10000/v1
opentelemetry:
build:
context: .
dockerfile: Dockerfile-opentelemetry
healthcheck:
test: ["CMD-SHELL", "curl -sf http://localhost:13133 || exit 1"]
interval: 1s
timeout: 120s
retries: 120
start_period: 5s
command: ["--config=/etc/otel-collector-config.yaml"]
ports:
- "${PORT_UI:-55679}:55679"
- "${PORT_GRPC:-4317}:4317"
- "${PORT_HTTP:-4318}:4318"
prometheus:
image: prom/prometheus
container_name: prometheus

View file

@ -0,0 +1,40 @@
extensions:
memory_ballast:
size_mib: 512
zpages:
endpoint: 0.0.0.0:55679
health_check:
receivers:
otlp:
protocols:
grpc:
endpoint: 0.0.0.0:4317
http:
endpoint: 0.0.0.0:4318
processors:
batch:
memory_limiter:
# 75% of maximum memory up to 4G
limit_mib: 1536
# 25% of limit up to 2G
spike_limit_mib: 512
check_interval: 5s
exporters:
debug:
verbosity: detailed
service:
pipelines:
traces:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [debug]
metrics:
receivers: [otlp]
processors: [memory_limiter, batch]
exporters: [debug]
extensions: [memory_ballast, zpages, health_check]

View file

@ -37,7 +37,7 @@ Benefits of Using ``Traceparent`` Headers
How to Initiate A Trace
-----------------------
1. **Enable Tracing Configuration**: Simply add the ``tracing: 100`` flag to in the :ref:`listener <arch_overview_listeners>` config
1. **Enable Tracing Configuration**: Simply add the ``random_sampling`` in ``tracing`` section to 100`` flag to in the :ref:`listener <arch_overview_listeners>` config
2. **Trace Context Propagation**: Arch automatically propagates the ``traceparent`` header. When a request is received, Arch will:
@ -46,7 +46,7 @@ How to Initiate A Trace
- Start a new span representing its processing of the request.
- Forward the ``traceparent`` header to downstream services.
3. **Sampling Policy**: The 100 in ``tracing: 100`` means that all the requests as sampled for tracing.
3. **Sampling Policy**: The 100 in ``random_sampling: 100`` means that all the requests as sampled for tracing.
You can adjust this value from 0-100.

View file

@ -103,4 +103,6 @@ error_target:
name: error_target_1
path: /error
tracing: 100 #sampling rate. Note by default Arch works on OpenTelemetry compatible tracing.
tracing:
# sampling rate. Note by default Arch works on OpenTelemetry compatible tracing.
sampling_rate: 0.1

View file

@ -8,6 +8,11 @@ pub struct Overrides {
pub prompt_target_intent_matching_threshold: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct Tracing {
pub sampling_rate: Option<f64>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Configuration {
pub version: String,
@ -19,8 +24,8 @@ pub struct Configuration {
pub prompt_guards: Option<PromptGuards>,
pub prompt_targets: Vec<PromptTarget>,
pub error_target: Option<ErrorTargetDetail>,
pub tracing: Option<i16>,
pub ratelimits: Option<Vec<Ratelimit>>,
pub tracing: Option<Tracing>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -277,6 +282,6 @@ mod test {
);
let tracing = config.tracing.as_ref().unwrap();
assert_eq!(*tracing, 100);
assert_eq!(tracing.sampling_rate.unwrap(), 0.1);
}
}