diff --git a/.gitignore b/.gitignore index 6278abc0..0fd2e447 100644 --- a/.gitignore +++ b/.gitignore @@ -29,3 +29,4 @@ model_server/model_server.egg-info model_server/venv_model_server model_server/build model_server/dist +arch_logs/ diff --git a/arch/arch_config_schema.yaml b/arch/arch_config_schema.yaml index f6349681..b0785e1e 100644 --- a/arch/arch_config_schema.yaml +++ b/arch/arch_config_schema.yaml @@ -144,6 +144,12 @@ properties: - model - selector - limit + tracing: + type: object + properties: + random_sampling: + type: integer + additionalProperties: false additionalProperties: false required: - version diff --git a/arch/build_filter.sh b/arch/build_filter.sh deleted file mode 100644 index 36736112..00000000 --- a/arch/build_filter.sh +++ /dev/null @@ -1,3 +0,0 @@ -RUST_VERSION=1.80.0 -docker run --rm -v rustup_cache:/usr/local/rustup/ rust:$RUST_VERSION rustup -v target add wasm32-wasi -docker run --rm -v $PWD/../open-message-format:/code/open-message-format -v ~/.cargo:/root/.cargo -v $(pwd):/code/arch -w /code/arch -v rustup_cache:/usr/local/rustup/ rust:$RUST_VERSION cargo build --release --target wasm32-wasi diff --git a/arch/build_filter_image.sh b/arch/build_filter_image.sh new file mode 100644 index 00000000..a0b6f55b --- /dev/null +++ b/arch/build_filter_image.sh @@ -0,0 +1 @@ +docker build -t archgw .. -f Dockerfile diff --git a/arch/docker-compose.dev.yaml b/arch/docker-compose.dev.yaml index 8c8ce464..33b692bb 100644 --- a/arch/docker-compose.dev.yaml +++ b/arch/docker-compose.dev.yaml @@ -3,11 +3,15 @@ services: image: archgw:latest ports: - "10000:10000" + - "11000:11000" - "19901:9901" volumes: - ${ARCH_CONFIG_FILE:-../demos/function_calling/arch_config.yaml}:/config/arch_config.yaml - /etc/ssl/cert.pem:/etc/ssl/cert.pem - - ./envoy.template.dev.yaml:/config/envoy.template.yaml + - ./envoy.template.yaml:/config/envoy.template.yaml - ./target/wasm32-wasi/release/intelligent_prompt_gateway.wasm:/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm + - ./arch_config_schema.yaml:/config/arch_config_schema.yaml + - ./tools/config_generator.py:/config/config_generator.py + - ./arch_logs:/var/log/ env_file: - stage.env diff --git a/arch/envoy.template.dev.yaml b/arch/envoy.template.dev.yaml deleted file mode 100644 index 9e83cf44..00000000 --- a/arch/envoy.template.dev.yaml +++ /dev/null @@ -1,179 +0,0 @@ -admin: - address: - socket_address: { address: 0.0.0.0, port_value: 9901 } -static_resources: - listeners: - address: - socket_address: - address: 0.0.0.0 - port_value: 10000 - filter_chains: - - filters: - - name: envoy.filters.network.http_connection_manager - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - stat_prefix: arch_ingress_http - codec_type: AUTO - scheme_header_transformation: - scheme_to_overwrite: https - access_log: - - name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - path: "/var/log/arch_access.log" - route_config: - name: local_routes - virtual_hosts: - - name: local_service - domains: - - "*" - routes: - - match: - prefix: "/mistral/v1/chat/completions" - route: - auto_host_rewrite: true - cluster: mistral_7b_instruct - timeout: 60s - {% for provider in arch_llm_providers %} - - match: - prefix: "/" - headers: - - name: "x-arch-llm-provider" - string_match: - exact: {{ provider.name }} - route: - auto_host_rewrite: true - cluster: {{ provider.provider }} - timeout: 60s - {% endfor %} - http_filters: - - name: envoy.filters.http.wasm - typed_config: - "@type": type.googleapis.com/udpa.type.v1.TypedStruct - type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm - value: - config: - name: "http_config" - configuration: - "@type": "type.googleapis.com/google.protobuf.StringValue" - value: | - {{ arch_config | indent(30) }} - vm_config: - runtime: "envoy.wasm.runtime.v8" - code: - local: - filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm" - - name: envoy.filters.http.router - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router - clusters: - - name: openai - connect_timeout: 5s - type: LOGICAL_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: openai - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: api.openai.com - port_value: 443 - hostname: "api.openai.com" - transport_socket: - name: envoy.transport_sockets.tls - typed_config: - "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - sni: api.openai.com - common_tls_context: - tls_params: - tls_minimum_protocol_version: TLSv1_2 - tls_maximum_protocol_version: TLSv1_3 - - name: mistral - connect_timeout: 5s - type: LOGICAL_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: mistral - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: api.mistral.ai - port_value: 443 - hostname: "api.mistral.ai" - transport_socket: - name: envoy.transport_sockets.tls - typed_config: - "@type": type.googleapis.com/envoy.extensions.transport_sockets.tls.v3.UpstreamTlsContext - sni: api.mistral.ai - - name: model_server - connect_timeout: 5s - type: STRICT_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: model_server - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: host.docker.internal - port_value: 51000 - hostname: "model_server" - - name: mistral_7b_instruct - connect_timeout: 5s - type: STRICT_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: mistral_7b_instruct - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: mistral_7b_instruct - port_value: 10001 - hostname: "mistral_7b_instruct" - - name: arch_fc - connect_timeout: 5s - type: STRICT_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: arch_fc - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: host.docker.internal - port_value: 51000 - hostname: "arch_fc" -{% for _, cluster in arch_clusters.items() %} - - name: {{ cluster.name }} - {% if cluster.connect_timeout -%} - connect_timeout: {{ cluster.connect_timeout }} - {% else -%} - connect_timeout: 5s - {% endif -%} - type: LOGICAL_DNS - dns_lookup_family: V4_ONLY - lb_policy: ROUND_ROBIN - load_assignment: - cluster_name: {{ cluster.name }} - endpoints: - - lb_endpoints: - - endpoint: - address: - socket_address: - address: {{ cluster.endpoint }} - port_value: {{ cluster.port }} - hostname: {{ cluster.name }} -{% endfor %} diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index 9e83cf44..30f13713 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -3,69 +3,165 @@ admin: socket_address: { address: 0.0.0.0, port_value: 9901 } static_resources: listeners: - address: - socket_address: - address: 0.0.0.0 - port_value: 10000 - filter_chains: - - filters: - - name: envoy.filters.network.http_connection_manager - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager - stat_prefix: arch_ingress_http - codec_type: AUTO - scheme_header_transformation: - scheme_to_overwrite: https - access_log: - - name: envoy.access_loggers.file - typed_config: - "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog - path: "/var/log/arch_access.log" - route_config: - name: local_routes - virtual_hosts: - - name: local_service - domains: - - "*" - routes: - - match: - prefix: "/mistral/v1/chat/completions" - route: - auto_host_rewrite: true - cluster: mistral_7b_instruct - timeout: 60s - {% for provider in arch_llm_providers %} - - match: - prefix: "/" - headers: - - name: "x-arch-llm-provider" - string_match: - exact: {{ provider.name }} - route: - auto_host_rewrite: true - cluster: {{ provider.provider }} - timeout: 60s - {% endfor %} - http_filters: - - name: envoy.filters.http.wasm + - name: arch_listener_http + address: + socket_address: + address: 0.0.0.0 + port_value: 10000 + traffic_direction: INBOUND + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + {% if arch_tracing.random_sampling > 0 %} + generate_request_id: true + tracing: + provider: + name: envoy.tracers.opentelemetry + typed_config: + "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + grpc_service: + envoy_grpc: + cluster_name: opentelemetry_collector + timeout: 0.250s + service_name: arch + random_sampling: + value: {{ arch_tracing.random_sampling }} + {% endif %} + stat_prefix: arch_listener_http + codec_type: AUTO + scheme_header_transformation: + scheme_to_overwrite: https + access_log: + - name: envoy.access_loggers.file typed_config: - "@type": type.googleapis.com/udpa.type.v1.TypedStruct - type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm - value: - config: - name: "http_config" - configuration: - "@type": "type.googleapis.com/google.protobuf.StringValue" - value: | - {{ arch_config | indent(30) }} - vm_config: - runtime: "envoy.wasm.runtime.v8" - code: - local: - filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm" - - name: envoy.filters.http.router + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "/var/log/arch_access.log" + route_config: + name: local_routes + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + {% for provider in arch_llm_providers %} + - match: + prefix: "/" + headers: + - name: "x-arch-llm-provider" + string_match: + exact: {{ provider.name }} + route: + auto_host_rewrite: true + cluster: {{ provider.provider }} + timeout: 60s + {% endfor %} + - match: + prefix: "/" + direct_response: + status: 400 + body: + inline_string: "x-arch-llm-provider header not set, cannot perform routing\n" + http_filters: + - name: envoy.filters.http.wasm + typed_config: + "@type": type.googleapis.com/udpa.type.v1.TypedStruct + type_url: type.googleapis.com/envoy.extensions.filters.http.wasm.v3.Wasm + value: + config: + name: "http_config" + configuration: + "@type": "type.googleapis.com/google.protobuf.StringValue" + value: | + {{ arch_config | indent(32) }} + vm_config: + runtime: "envoy.wasm.runtime.v8" + code: + local: + filename: "/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm" + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + + - name: arch_internal + address: + socket_address: + address: 0.0.0.0 + port_value: 11000 + traffic_direction: OUTBOUND + filter_chains: + - filters: + - name: envoy.filters.network.http_connection_manager + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager + {% if arch_tracing.random_sampling > 0 %} + generate_request_id: true + tracing: + provider: + name: envoy.tracers.opentelemetry + typed_config: + "@type": type.googleapis.com/envoy.config.trace.v3.OpenTelemetryConfig + grpc_service: + envoy_grpc: + cluster_name: opentelemetry_collector + timeout: 0.250s + service_name: arch + random_sampling: + value: {{ arch_tracing.random_sampling }} + {% endif %} + stat_prefix: arch_internal + codec_type: AUTO + scheme_header_transformation: + scheme_to_overwrite: https + access_log: + - name: envoy.access_loggers.file typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router + "@type": type.googleapis.com/envoy.extensions.access_loggers.file.v3.FileAccessLog + path: "/var/log/arch_access_internal.log" + route_config: + name: local_routes + virtual_hosts: + - name: local_service + domains: + - "*" + routes: + - match: + prefix: "/" + headers: + - name: "x-arch-upstream" + string_match: + exact: model_server + route: + auto_host_rewrite: true + cluster: model_server + timeout: 60s + - match: + prefix: "/" + headers: + - name: "x-arch-upstream" + string_match: + exact: arch_fc + route: + auto_host_rewrite: true + cluster: model_server + timeout: 60s + {% for _, cluster in arch_clusters.items() %} + - match: + prefix: "/" + headers: + - name: "x-arch-upstream" + string_match: + exact: {{ cluster.name }} + route: + auto_host_rewrite: true + cluster: {{ cluster.name }} + timeout: 60s + {% endfor %} + http_filters: + - name: envoy.filters.http.router + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router clusters: - name: openai connect_timeout: 5s @@ -177,3 +273,39 @@ static_resources: port_value: {{ cluster.port }} hostname: {{ cluster.name }} {% endfor %} + - name: arch_internal + connect_timeout: 5s + type: LOGICAL_DNS + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: arch_internal + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 0.0.0.0 + port_value: 11000 + hostname: arch_internal + +{% if arch_tracing.random_sampling > 0 %} + - name: opentelemetry_collector + type: STRICT_DNS + dns_lookup_family: V4_ONLY + lb_policy: ROUND_ROBIN + typed_extension_protocol_options: + envoy.extensions.upstreams.http.v3.HttpProtocolOptions: + "@type": type.googleapis.com/envoy.extensions.upstreams.http.v3.HttpProtocolOptions + explicit_http_config: + http2_protocol_options: {} + load_assignment: + cluster_name: opentelemetry_collector + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: host.docker.internal + port_value: 4317 +{% endif %} diff --git a/arch/src/consts.rs b/arch/src/consts.rs index f6fbdc9d..48dd5494 100644 --- a/arch/src/consts.rs +++ b/arch/src/consts.rs @@ -15,3 +15,6 @@ pub const ARCH_PROVIDER_HINT_HEADER: &str = "x-arch-llm-provider-hint"; pub const CHAT_COMPLETIONS_PATH: &str = "v1/chat/completions"; pub const ARCH_STATE_HEADER: &str = "x-arch-state"; pub const ARCH_FC_MODEL_NAME: &str = "Arch-Function-1.5B"; +pub const REQUEST_ID_HEADER: &str = "x-request-id"; +pub const ARCH_INTERNAL_CLUSTER_NAME: &str = "arch_internal"; +pub const ARCH_UPSTREAM_HOST_HEADER: &str = "x-arch-upstream"; diff --git a/arch/src/filter_context.rs b/arch/src/filter_context.rs index e0a80596..491484bb 100644 --- a/arch/src/filter_context.rs +++ b/arch/src/filter_context.rs @@ -1,4 +1,7 @@ -use crate::consts::{DEFAULT_EMBEDDING_MODEL, MODEL_SERVER_NAME}; +use crate::consts::{ + ARCH_INTERNAL_CLUSTER_NAME, ARCH_UPSTREAM_HOST_HEADER, DEFAULT_EMBEDDING_MODEL, + MODEL_SERVER_NAME, +}; use crate::http::{CallArgs, Client}; use crate::llm_providers::LlmProviders; use crate::ratelimit; @@ -98,9 +101,10 @@ impl FilterContext { let json_data = serde_json::to_string(&embeddings_input).unwrap(); let call_args = CallArgs::new( - MODEL_SERVER_NAME, + ARCH_INTERNAL_CLUSTER_NAME, "/embeddings", vec![ + (ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME), (":method", "POST"), (":path", "/embeddings"), (":authority", MODEL_SERVER_NAME), diff --git a/arch/src/stream_context.rs b/arch/src/stream_context.rs index 9824ede1..e80c824a 100644 --- a/arch/src/stream_context.rs +++ b/arch/src/stream_context.rs @@ -1,9 +1,9 @@ use crate::consts::{ - ARCH_FC_MODEL_NAME, ARCH_FC_REQUEST_TIMEOUT_MS, ARCH_MESSAGES_KEY, ARCH_PROVIDER_HINT_HEADER, - ARCH_ROUTING_HEADER, ARCH_STATE_HEADER, ARC_FC_CLUSTER, CHAT_COMPLETIONS_PATH, - DEFAULT_EMBEDDING_MODEL, DEFAULT_HALLUCINATED_THRESHOLD, DEFAULT_INTENT_MODEL, - DEFAULT_PROMPT_TARGET_THRESHOLD, GPT_35_TURBO, MODEL_SERVER_NAME, - RATELIMIT_SELECTOR_HEADER_KEY, SYSTEM_ROLE, USER_ROLE, + ARCH_FC_MODEL_NAME, ARCH_FC_REQUEST_TIMEOUT_MS, ARCH_INTERNAL_CLUSTER_NAME, ARCH_MESSAGES_KEY, + ARCH_PROVIDER_HINT_HEADER, ARCH_ROUTING_HEADER, ARCH_STATE_HEADER, ARCH_UPSTREAM_HOST_HEADER, + ARC_FC_CLUSTER, CHAT_COMPLETIONS_PATH, DEFAULT_EMBEDDING_MODEL, DEFAULT_HALLUCINATED_THRESHOLD, + DEFAULT_INTENT_MODEL, DEFAULT_PROMPT_TARGET_THRESHOLD, GPT_35_TURBO, MODEL_SERVER_NAME, + RATELIMIT_SELECTOR_HEADER_KEY, REQUEST_ID_HEADER, SYSTEM_ROLE, USER_ROLE, }; use crate::filter_context::{EmbeddingsStore, WasmMetrics}; use crate::http::{CallArgs, Client, ClientError}; @@ -109,9 +109,11 @@ pub struct StreamContext { prompt_guards: Rc, llm_providers: Rc, llm_provider: Option>, + request_id: Option, } impl StreamContext { + #[allow(clippy::too_many_arguments)] pub fn new( context_id: u32, metrics: Rc, @@ -143,6 +145,7 @@ impl StreamContext { llm_provider: None, prompt_guards, overrides, + request_id: None, } } fn llm_provider(&self) -> &LlmProvider { @@ -292,17 +295,24 @@ impl StreamContext { } }; + let mut headers = vec![ + (ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME), + (":method", "POST"), + (":path", "/zeroshot"), + (":authority", MODEL_SERVER_NAME), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", "60000"), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - MODEL_SERVER_NAME, + ARCH_INTERNAL_CLUSTER_NAME, "/zeroshot", - vec![ - (":method", "POST"), - (":path", "/zeroshot"), - (":authority", MODEL_SERVER_NAME), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", "60000"), - ], + headers, Some(json_data.as_bytes()), vec![], Duration::from_secs(5), @@ -470,17 +480,25 @@ impl StreamContext { debug!("no prompt target found with similarity score above threshold, using default prompt target"); let timeout_str = ARCH_FC_REQUEST_TIMEOUT_MS.to_string(); + + let mut headers = vec![ + (":method", "POST"), + (ARCH_UPSTREAM_HOST_HEADER, &upstream_endpoint), + (":path", &upstream_path), + (":authority", &upstream_endpoint), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - &upstream_endpoint, + ARCH_INTERNAL_CLUSTER_NAME, &upstream_path, - vec![ - (":method", "POST"), - (":path", &upstream_path), - (":authority", &upstream_endpoint), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), - ], + headers, Some(arch_messages_json.as_bytes()), vec![], Duration::from_secs(5), @@ -578,17 +596,25 @@ impl StreamContext { }; let timeout_str = ARCH_FC_REQUEST_TIMEOUT_MS.to_string(); + + let mut headers = vec![ + (":method", "POST"), + (ARCH_UPSTREAM_HOST_HEADER, ARC_FC_CLUSTER), + (":path", "/v1/chat/completions"), + (":authority", ARC_FC_CLUSTER), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - ARC_FC_CLUSTER, + ARCH_INTERNAL_CLUSTER_NAME, "/v1/chat/completions", - vec![ - (":method", "POST"), - (":path", "/v1/chat/completions"), - (":authority", ARC_FC_CLUSTER), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", timeout_str.as_str()), - ], + headers, Some(msg_body.as_bytes()), vec![], Duration::from_secs(5), @@ -693,17 +719,25 @@ impl StreamContext { return self.send_server_error(ServerError::Serialization(error), None); } }; + + let mut headers = vec![ + (ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME), + (":method", "POST"), + (":path", "/hallucination"), + (":authority", MODEL_SERVER_NAME), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", "60000"), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - MODEL_SERVER_NAME, + ARCH_INTERNAL_CLUSTER_NAME, "/hallucination", - vec![ - (":method", "POST"), - (":path", "/hallucination"), - (":authority", MODEL_SERVER_NAME), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", "60000"), - ], + headers, Some(json_data.as_bytes()), vec![], Duration::from_secs(5), @@ -740,16 +774,24 @@ impl StreamContext { let endpoint = prompt_target.endpoint.unwrap(); let path: String = endpoint.path.unwrap_or(String::from("/")); + + let mut headers = vec![ + (ARCH_UPSTREAM_HOST_HEADER, endpoint.name.as_str()), + (":method", "POST"), + (":path", &path), + (":authority", endpoint.name.as_str()), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - &endpoint.name, + ARCH_INTERNAL_CLUSTER_NAME, &path, - vec![ - (":method", "POST"), - (":path", &path), - (":authority", endpoint.name.as_str()), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ], + headers, Some(tool_params_json_str.as_bytes()), vec![], Duration::from_secs(5), @@ -799,10 +841,7 @@ impl StreamContext { // add system prompt let system_prompt = match prompt_target.system_prompt.as_ref() { - None => match self.system_prompt.as_ref() { - None => None, - Some(system_prompt) => Some(system_prompt.clone()), - }, + None => self.system_prompt.as_ref().clone(), Some(system_prompt) => Some(system_prompt.clone()), }; if system_prompt.is_some() { @@ -927,17 +966,22 @@ impl StreamContext { } }; + let mut headers = vec![ + (ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME), + (":method", "POST"), + (":path", "/embeddings"), + (":authority", MODEL_SERVER_NAME), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", "60000"), + ]; + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } let call_args = CallArgs::new( - MODEL_SERVER_NAME, + ARCH_INTERNAL_CLUSTER_NAME, "/embeddings", - vec![ - (":method", "POST"), - (":path", "/embeddings"), - (":authority", MODEL_SERVER_NAME), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", "60000"), - ], + headers, Some(json_data.as_bytes()), vec![], Duration::from_secs(5), @@ -1054,6 +1098,8 @@ impl HttpContext for StreamContext { self.get_http_request_headers() ); + self.request_id = self.get_http_request_header(REQUEST_ID_HEADER); + Action::Continue } @@ -1180,17 +1226,24 @@ impl HttpContext for StreamContext { } }; + let mut headers = vec![ + (ARCH_UPSTREAM_HOST_HEADER, MODEL_SERVER_NAME), + (":method", "POST"), + (":path", "/guard"), + (":authority", MODEL_SERVER_NAME), + ("content-type", "application/json"), + ("x-envoy-max-retries", "3"), + ("x-envoy-upstream-rq-timeout-ms", "60000"), + ]; + + if self.request_id.is_some() { + headers.push((REQUEST_ID_HEADER, self.request_id.as_ref().unwrap())); + } + let call_args = CallArgs::new( - MODEL_SERVER_NAME, + ARCH_INTERNAL_CLUSTER_NAME, "/guard", - vec![ - (":method", "POST"), - (":path", "/guard"), - (":authority", MODEL_SERVER_NAME), - ("content-type", "application/json"), - ("x-envoy-max-retries", "3"), - ("x-envoy-upstream-rq-timeout-ms", "60000"), - ], + headers, Some(json_data.as_bytes()), vec![], Duration::from_secs(5), @@ -1286,6 +1339,7 @@ impl HttpContext for StreamContext { match serde_json::from_slice(&body) { Ok(de) => de, Err(e) => { + debug!("invalid response: {}", String::from_utf8_lossy(&body)); self.send_server_error(ServerError::Deserialization(e), None); return Action::Pause; } diff --git a/arch/tests/integration.rs b/arch/tests/integration.rs index 9016b617..3fcf3fee 100644 --- a/arch/tests/integration.rs +++ b/arch/tests/integration.rs @@ -56,6 +56,8 @@ fn request_headers_expectations(module: &mut Tester, http_context: i32) { .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some(":path")) .returning(Some("/v1/chat/completions")) .expect_log(Some(LogLevel::Debug), None) + .expect_get_header_map_value(Some(MapType::HttpRequestHeaders), Some("x-request-id")) + .returning(None) .execute_and_expect(ReturnType::Action(Action::Continue)) .unwrap(); } @@ -95,8 +97,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) { .returning(Some(chat_completions_request_body)) // The actual call is not important in this test, we just need to grab the token_id .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/guard"), (":authority", "model_server"), @@ -135,8 +138,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) { .expect_log(Some(LogLevel::Debug), None) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/embeddings"), (":authority", "model_server"), @@ -179,8 +183,9 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) { .expect_log(Some(LogLevel::Debug), None) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/zeroshot"), (":authority", "model_server"), @@ -220,9 +225,10 @@ fn normal_flow(module: &mut Tester, filter_context: i32, http_context: i32) { .expect_log(Some(LogLevel::Debug), None) .expect_log(Some(LogLevel::Info), None) .expect_http_call( - Some("arch_fc"), + Some("arch_internal"), Some(vec![ (":method", "POST"), + ("x-arch-upstream", "arch_fc"), (":path", "/v1/chat/completions"), (":authority", "arch_fc"), ("content-type", "application/json"), @@ -262,8 +268,9 @@ fn setup_filter(module: &mut Tester, config: &str) -> i32 { .call_proxy_on_tick(filter_context) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/embeddings"), (":authority", "model_server"), @@ -441,7 +448,7 @@ fn successful_request_to_open_ai_chat_completions() { .expect_get_buffer_bytes(Some(BufferType::HttpRequestBody)) .returning(Some(chat_completions_request_body)) .expect_log(Some(LogLevel::Debug), None) - .expect_http_call(Some("model_server"), None, None, None, None) + .expect_http_call(Some("arch_internal"), None, None, None, None) .returning(Some(4)) .expect_metric_increment("active_http_calls", 1) .execute_and_expect(ReturnType::Action(Action::Pause)) @@ -573,8 +580,9 @@ fn request_ratelimited() { .expect_log(Some(LogLevel::Debug), None) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/hallucination"), (":authority", "model_server"), @@ -605,8 +613,9 @@ fn request_ratelimited() { .returning(Some(&body_text)) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("api_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "api_server"), (":method", "POST"), (":path", "/weather"), (":authority", "api_server"), @@ -713,8 +722,9 @@ fn request_not_ratelimited() { .expect_log(Some(LogLevel::Debug), None) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("model_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "model_server"), (":method", "POST"), (":path", "/hallucination"), (":authority", "model_server"), @@ -750,8 +760,9 @@ fn request_not_ratelimited() { .returning(Some(&body_text)) .expect_log(Some(LogLevel::Debug), None) .expect_http_call( - Some("api_server"), + Some("arch_internal"), Some(vec![ + ("x-arch-upstream", "api_server"), (":method", "POST"), (":path", "/weather"), (":authority", "api_server"), diff --git a/arch/tools/config_generator.py b/arch/tools/config_generator.py index 1c3538f9..46cfd93d 100644 --- a/arch/tools/config_generator.py +++ b/arch/tools/config_generator.py @@ -67,12 +67,14 @@ def validate_and_render_schema(): config_yaml = add_secret_key_to_llm_providers(config_yaml) arch_llm_providers = config_yaml["llm_providers"] + arch_tracing = config_yaml.get("tracing", {}) arch_config_string = yaml.dump(config_yaml) data = { 'arch_config': arch_config_string, 'arch_clusters': inferred_clusters, - 'arch_llm_providers': arch_llm_providers + 'arch_llm_providers': arch_llm_providers, + 'arch_tracing': arch_tracing } rendered = template.render(data) diff --git a/chatbot_ui/Dockerfile b/chatbot_ui/Dockerfile index 5c83012a..68b41f94 100644 --- a/chatbot_ui/Dockerfile +++ b/chatbot_ui/Dockerfile @@ -1,15 +1,16 @@ -FROM python:3 AS base +FROM python:3.10 AS base FROM base AS builder WORKDIR /src COPY requirements.txt /src/ + RUN pip install --prefix=/runtime --force-reinstall -r requirements.txt COPY . /src -FROM python:3-slim AS output +FROM python:3.10-slim AS output COPY --from=builder /runtime /usr/local diff --git a/demos/function_calling/Dockerfile-opentelemetry b/demos/function_calling/Dockerfile-opentelemetry new file mode 100644 index 00000000..146d9585 --- /dev/null +++ b/demos/function_calling/Dockerfile-opentelemetry @@ -0,0 +1,11 @@ +FROM alpine:3.20@sha256:beefdbd8a1da6d2915566fde36db9db0b524eb737fc57cd1367effd16dc0d06d AS otelc_curl +RUN apk --update add curl + +FROM otel/opentelemetry-collector:latest@sha256:aef3e6d742fb69b94e9c0813a028449d28438bb6f9c93cb5d0b8d0704b78ae65 + +COPY --from=otelc_curl / / + +COPY ./otel-collector-config.yaml /etc/otel-collector-config.yaml +USER 0 +RUN chmod o+r /etc/otel-collector-config.yaml +USER nobody diff --git a/demos/function_calling/arch_config.yaml b/demos/function_calling/arch_config.yaml index fa381c0d..f60d50f3 100644 --- a/demos/function_calling/arch_config.yaml +++ b/demos/function_calling/arch_config.yaml @@ -80,3 +80,6 @@ ratelimits: limit: tokens: 1 unit: minute + +tracing: + random_sampling: 100 diff --git a/demos/function_calling/docker-compose.yaml b/demos/function_calling/docker-compose.yaml index 8cbe0da8..a4e316ae 100644 --- a/demos/function_calling/docker-compose.yaml +++ b/demos/function_calling/docker-compose.yaml @@ -21,6 +21,22 @@ services: - MISTRAL_API_KEY=${MISTRAL_API_KEY:?error} - CHAT_COMPLETION_ENDPOINT=http://host.docker.internal:10000/v1 + opentelemetry: + build: + context: . + dockerfile: Dockerfile-opentelemetry + healthcheck: + test: ["CMD-SHELL", "curl -sf http://localhost:13133 || exit 1"] + interval: 1s + timeout: 120s + retries: 120 + start_period: 5s + command: ["--config=/etc/otel-collector-config.yaml"] + ports: + - "${PORT_UI:-55679}:55679" + - "${PORT_GRPC:-4317}:4317" + - "${PORT_HTTP:-4318}:4318" + prometheus: image: prom/prometheus container_name: prometheus diff --git a/demos/function_calling/otel-collector-config.yaml b/demos/function_calling/otel-collector-config.yaml new file mode 100755 index 00000000..903dd128 --- /dev/null +++ b/demos/function_calling/otel-collector-config.yaml @@ -0,0 +1,40 @@ +extensions: + memory_ballast: + size_mib: 512 + zpages: + endpoint: 0.0.0.0:55679 + health_check: + +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: + memory_limiter: + # 75% of maximum memory up to 4G + limit_mib: 1536 + # 25% of limit up to 2G + spike_limit_mib: 512 + check_interval: 5s + +exporters: + debug: + verbosity: detailed + +service: + pipelines: + traces: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [debug] + metrics: + receivers: [otlp] + processors: [memory_limiter, batch] + exporters: [debug] + + extensions: [memory_ballast, zpages, health_check] diff --git a/docs/source/guides/observability/tracing.rst b/docs/source/guides/observability/tracing.rst index dfe30289..b1027d25 100644 --- a/docs/source/guides/observability/tracing.rst +++ b/docs/source/guides/observability/tracing.rst @@ -37,7 +37,7 @@ Benefits of Using ``Traceparent`` Headers How to Initiate A Trace ----------------------- -1. **Enable Tracing Configuration**: Simply add the ``tracing: 100`` flag to in the :ref:`listener ` config +1. **Enable Tracing Configuration**: Simply add the ``random_sampling`` in ``tracing`` section to 100`` flag to in the :ref:`listener ` config 2. **Trace Context Propagation**: Arch automatically propagates the ``traceparent`` header. When a request is received, Arch will: @@ -46,7 +46,7 @@ How to Initiate A Trace - Start a new span representing its processing of the request. - Forward the ``traceparent`` header to downstream services. -3. **Sampling Policy**: The 100 in ``tracing: 100`` means that all the requests as sampled for tracing. +3. **Sampling Policy**: The 100 in ``random_sampling: 100`` means that all the requests as sampled for tracing. You can adjust this value from 0-100. diff --git a/docs/source/resources/includes/arch_config_full_reference.yaml b/docs/source/resources/includes/arch_config_full_reference.yaml index 8ee8658e..2206aa2e 100644 --- a/docs/source/resources/includes/arch_config_full_reference.yaml +++ b/docs/source/resources/includes/arch_config_full_reference.yaml @@ -103,4 +103,6 @@ error_target: name: error_target_1 path: /error -tracing: 100 #sampling rate. Note by default Arch works on OpenTelemetry compatible tracing. +tracing: + # sampling rate. Note by default Arch works on OpenTelemetry compatible tracing. + sampling_rate: 0.1 diff --git a/public_types/src/configuration.rs b/public_types/src/configuration.rs index 550ccf7c..d7f2e543 100644 --- a/public_types/src/configuration.rs +++ b/public_types/src/configuration.rs @@ -8,6 +8,11 @@ pub struct Overrides { pub prompt_target_intent_matching_threshold: Option, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct Tracing { + pub sampling_rate: Option, +} + #[derive(Debug, Clone, Serialize, Deserialize)] pub struct Configuration { pub version: String, @@ -19,8 +24,8 @@ pub struct Configuration { pub prompt_guards: Option, pub prompt_targets: Vec, pub error_target: Option, - pub tracing: Option, pub ratelimits: Option>, + pub tracing: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -277,6 +282,6 @@ mod test { ); let tracing = config.tracing.as_ref().unwrap(); - assert_eq!(*tracing, 100); + assert_eq!(tracing.sampling_rate.unwrap(), 0.1); } }