diff --git a/arch/Dockerfile b/arch/Dockerfile index 073c0b6b..85721f58 100644 --- a/arch/Dockerfile +++ b/arch/Dockerfile @@ -12,6 +12,9 @@ FROM envoyproxy/envoy:v1.31-latest as envoy #Build config generator, so that we have a single build image for both Rust and Python FROM python:3-slim as arch + +RUN apt-get update && apt-get install -y gettext-base && apt-get clean && rm -rf /var/lib/apt/lists/* + COPY --from=builder /arch/target/wasm32-wasi/release/prompt_gateway.wasm /etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm COPY --from=builder /arch/target/wasm32-wasi/release/llm_gateway.wasm /etc/envoy/proxy-wasm-plugins/llm_gateway.wasm COPY --from=envoy /usr/local/bin/envoy /usr/local/bin/envoy @@ -22,4 +25,5 @@ COPY arch/tools/cli/config_generator.py . COPY arch/envoy.template.yaml . COPY arch/arch_config_schema.yaml . -CMD ["sh", "-c", "python config_generator.py && envoy -c /etc/envoy/envoy.yaml --component-log-level wasm:debug"] + +ENTRYPOINT ["sh", "-c", "python config_generator.py && envsubst < /etc/envoy/envoy.yaml > /etc/envoy.env_sub.yaml && envoy -c /etc/envoy.env_sub.yaml --component-log-level wasm:debug"] diff --git a/arch/build_filter_image.sh b/arch/build_filter_image.sh index a0b6f55b..75ac81ce 100644 --- a/arch/build_filter_image.sh +++ b/arch/build_filter_image.sh @@ -1 +1 @@ -docker build -t archgw .. -f Dockerfile +docker build -f Dockerfile .. -t katanemo/archgw diff --git a/arch/docker-compose.dev.yaml b/arch/docker-compose.dev.yaml index 7457bfc5..c2dcb332 100644 --- a/arch/docker-compose.dev.yaml +++ b/arch/docker-compose.dev.yaml @@ -1,6 +1,6 @@ services: archgw: - image: archgw:latest + image: katanemo/archgw:latest ports: - "10000:10000" - "11000:11000" @@ -10,9 +10,12 @@ services: - ${ARCH_CONFIG_FILE:-../demos/function_calling/arch_config.yaml}:/config/arch_config.yaml - /etc/ssl/cert.pem:/etc/ssl/cert.pem - ./envoy.template.yaml:/config/envoy.template.yaml - - ./target/wasm32-wasi/release/intelligent_prompt_gateway.wasm:/etc/envoy/proxy-wasm-plugins/intelligent_prompt_gateway.wasm - ./arch_config_schema.yaml:/config/arch_config_schema.yaml - ./tools/cli/config_generator.py:/config/config_generator.py - - ./arch_logs:/var/log/ - env_file: - - stage.env + - ../crates/target/wasm32-wasi/release/llm_gateway.wasm:/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm + - ../crates/target/wasm32-wasi/release/prompt_gateway.wasm:/etc/envoy/proxy-wasm-plugins/prompt_gateway.wasm + - ~/archgw_logs:/var/log/ + extra_hosts: + - "host.docker.internal:host-gateway" + environment: + - OPENAI_API_KEY=${OPENAI_API_KEY:?error} diff --git a/arch/docker-compose.yaml b/arch/docker-compose.yaml index 0a2e5a99..11424166 100644 --- a/arch/docker-compose.yaml +++ b/arch/docker-compose.yaml @@ -10,7 +10,7 @@ services: - ${ARCH_CONFIG_FILE:-./demos/function_calling/arch_confg.yaml}:/config/arch_config.yaml - /etc/ssl/cert.pem:/etc/ssl/cert.pem - ~/archgw_logs:/var/log/ - env_file: - - stage.env extra_hosts: - "host.docker.internal:host-gateway" + env_file: + - stage.env diff --git a/arch/envoy.template.yaml b/arch/envoy.template.yaml index 14e26e84..e64ac422 100644 --- a/arch/envoy.template.yaml +++ b/arch/envoy.template.yaml @@ -206,6 +206,18 @@ static_resources: body: inline_string: "x-arch-llm-provider header not set, llm gateway cannot perform routing\n" http_filters: + + + - name: envoy.filters.http.compressor + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.compressor.v3.Compressor + compressor_library: + name: compress + typed_config: + "@type": type.googleapis.com/envoy.extensions.compression.gzip.compressor.v3.Gzip + memory_level: 3 + window_bits: 10 + - name: envoy.filters.http.wasm typed_config: "@type": type.googleapis.com/udpa.type.v1.TypedStruct @@ -223,6 +235,22 @@ static_resources: code: local: filename: "/etc/envoy/proxy-wasm-plugins/llm_gateway.wasm" + + + + - name: envoy.filters.http.decompressor + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.decompressor.v3.Decompressor + decompressor_library: + name: decompress + typed_config: + "@type": "type.googleapis.com/envoy.extensions.compression.gzip.decompressor.v3.Gzip" + window_bits: 9 + chunk_size: 8192 + # If this ratio is set too low, then body data will not be decompressed completely. + max_inflate_ratio: 1000 + + - name: envoy.filters.http.router typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router diff --git a/chatbot_ui/.vscode/launch.json b/chatbot_ui/.vscode/launch.json index 8b42a191..6f81b218 100644 --- a/chatbot_ui/.vscode/launch.json +++ b/chatbot_ui/.vscode/launch.json @@ -30,7 +30,7 @@ } }, { - "name": "chatbot-ui streaming", + "name": "chatbot-ui (llm) streaming", "cwd": "${workspaceFolder}/app", "type": "debugpy", "request": "launch", @@ -38,7 +38,7 @@ "console": "integratedTerminal", "env": { "LLM": "1", - "CHAT_COMPLETION_ENDPOINT": "http://localhost:10000/v1" + "CHAT_COMPLETION_ENDPOINT": "http://localhost:12000/v1" } } ] diff --git a/crates/common/src/common_types.rs b/crates/common/src/common_types.rs index 45c7a1be..16925c66 100644 --- a/crates/common/src/common_types.rs +++ b/crates/common/src/common_types.rs @@ -261,7 +261,10 @@ pub mod open_ai { fn try_from(value: &str) -> Result { let mut response_chunks: VecDeque = value - .split("data: ") + .lines() + .filter(|line| line.starts_with("data: ")) + .map(|line| line.get(6..).unwrap()) + .filter(|data_chunk| *data_chunk != "[DONE]") .map(|data_chunk| serde_json::from_str::(data_chunk)) .collect::, _>>()?; @@ -272,10 +275,10 @@ pub mod open_ai { .delta .content .take() - .ok_or(ChatCompletionChunkResponseError::EmptyContent) + .unwrap_or("".to_string()) }) - .collect::, _>>()? - .join(" "); + .collect::>() + .join(""); let mut response_chunk = response_chunks .pop_front() @@ -489,4 +492,58 @@ mod test { ParameterType::String ); } + + #[test] + fn stream_chunk_parse() { + use super::open_ai::{ChatCompletionChunkResponse, ChunkChoice, Delta}; + + const CHUNK_RESPONSE: &str = r#"data: {"id":"chatcmpl-ALmdmtKulBMEq3fRLbrnxJwcKOqvS","object":"chat.completion.chunk","created":1729755226,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"role":"assistant","content":"","refusal":null},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALmdmtKulBMEq3fRLbrnxJwcKOqvS","object":"chat.completion.chunk","created":1729755226,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"Hello"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALmdmtKulBMEq3fRLbrnxJwcKOqvS","object":"chat.completion.chunk","created":1729755226,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"!"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALmdmtKulBMEq3fRLbrnxJwcKOqvS","object":"chat.completion.chunk","created":1729755226,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" How"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALmdmtKulBMEq3fRLbrnxJwcKOqvS","object":"chat.completion.chunk","created":1729755226,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" can"},"logprobs":null,"finish_reason":null}]} + + +"#; + + let chunk_response: ChatCompletionChunkResponse = + ChatCompletionChunkResponse::try_from(CHUNK_RESPONSE).unwrap(); + assert_eq!(chunk_response.choices.len(), 1); + assert_eq!( + chunk_response.choices[0].delta.content.as_ref().unwrap(), + "Hello! How can" + ); + } + + #[test] + fn stream_chunk_parse_done() { + use super::open_ai::{ChatCompletionChunkResponse, ChunkChoice, Delta}; + + const CHUNK_RESPONSE: &str = r#"data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" I"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" assist"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" you"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":" today"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{"content":"?"},"logprobs":null,"finish_reason":null}]} + +data: {"id":"chatcmpl-ALn2KTfmrIpYd9N3Un4Kyg08WIIP6","object":"chat.completion.chunk","created":1729756748,"model":"gpt-3.5-turbo-0125","system_fingerprint":null,"choices":[{"index":0,"delta":{},"logprobs":null,"finish_reason":"stop"}]} + +data: [DONE] +"#; + + let chunk_response: ChatCompletionChunkResponse = + ChatCompletionChunkResponse::try_from(CHUNK_RESPONSE).unwrap(); + assert_eq!(chunk_response.choices.len(), 1); + assert_eq!( + chunk_response.choices[0].delta.content.as_ref().unwrap(), + " I assist you today?" + ); + } } diff --git a/crates/llm_gateway/src/stream_context.rs b/crates/llm_gateway/src/stream_context.rs index 7ae841da..2bbc8101 100644 --- a/crates/llm_gateway/src/stream_context.rs +++ b/crates/llm_gateway/src/stream_context.rs @@ -12,7 +12,7 @@ use common::llm_providers::LlmProviders; use common::ratelimit::Header; use common::{ratelimit, routing, tokenizer}; use http::StatusCode; -use log::debug; +use log::{debug, warn}; use proxy_wasm::traits::*; use proxy_wasm::types::*; use std::num::NonZero; @@ -32,6 +32,7 @@ pub struct StreamContext { request_id: Option, } +#[derive(Debug)] struct StreamingResponse { bytes_read: usize, } @@ -252,16 +253,20 @@ impl HttpContext for StreamContext { fn on_http_response_body(&mut self, body_size: usize, end_of_stream: bool) -> Action { debug!( - "recv [S={}] bytes={} end_stream={}", + "on_http_response_body [S={}] bytes={} end_stream={}", self.context_id, body_size, end_of_stream ); if !self.is_chat_completions_request { + debug!("non-chatgpt request"); if let Some(body_str) = self .get_http_response_body(0, body_size) .and_then(|bytes| String::from_utf8(bytes).ok()) { - debug!("recv [S={}] body_str={}", self.context_id, body_str); + debug!( + "on_http_response_body non-chatgpt request [S={}] body_str={}", + self.context_id, body_str + ); } return Action::Continue; } @@ -272,29 +277,68 @@ impl HttpContext for StreamContext { let body = match self.streaming_response.take() { Some(mut streaming_response) => { - let streaming_chunk = self - .get_http_response_body(streaming_response.bytes_read, body_size) - .expect("cant get response body"); - streaming_response.bytes_read += body_size; + if end_of_stream && body_size == 0 { + return Action::Continue; + } + let chunk_start = 0; + let chunk_size = body_size; + debug!("streaming respose reading, {}..{}", chunk_start, chunk_size); + let streaming_chunk = match self.get_http_response_body(0, chunk_size) { + Some(chunk) => chunk, + None => { + warn!( + "response body empy, chunk_start: {}, chunk_size: {}", + chunk_start, chunk_size + ); + return Action::Continue; + } + }; + + if streaming_chunk.len() != chunk_size { + warn!( + "chunk size mismatch: read: {} != requested: {}", + streaming_chunk.len(), + chunk_size + ); + } + streaming_response.bytes_read += chunk_size; // n.b: this funky take and replace of the streaming_response struct is done to appease the borrow // checker which wouldn't let us take a mut ref of streaming_response, and then a ref for // `get_http_response_body` self.streaming_response = Some(streaming_response); streaming_chunk } - None => self - .get_http_response_body(0, body_size) - .expect("cant get response body"), + None => { + debug!("non streaming response bytes read: 0:{}", body_size); + match self.get_http_response_body(0, body_size) { + Some(body) => body, + None => { + warn!("non streaming response body empty"); + return Action::Continue; + } + } + } }; - if self.streaming_response.is_some() { - let body_str = String::from_utf8(body).expect("body is not utf-8"); - debug!("streaming response"); + let body_utf8 = match String::from_utf8(body.to_vec()) { + Ok(body_utf8) => body_utf8, + Err(e) => { + debug!("could not convert to utf8: {}", e); + return Action::Continue; + } + }; + debug!("chunk data: body str: {}", body_utf8); + + if self.streaming_response.is_some() { let chat_completions_chunk_response = - match ChatCompletionChunkResponse::try_from(body_str.as_str()) { + match ChatCompletionChunkResponse::try_from(body_utf8.as_str()) { Ok(response) => response, Err(e) => { + debug!( + "invalid streaming response: body str: {}, {:?}", + body_utf8, e + ); self.send_server_error(e.into(), None); return Action::Pause; } diff --git a/crates/prompt_gateway/src/hallucination.rs b/crates/prompt_gateway/src/hallucination.rs index 62b119ac..1ae25ab1 100644 --- a/crates/prompt_gateway/src/hallucination.rs +++ b/crates/prompt_gateway/src/hallucination.rs @@ -40,8 +40,8 @@ pub fn extract_messages_for_hallucination(messages: &Vec) -> Vec archgw: {}", String::from_utf8_lossy(&body_bytes)); + debug!( + "developer => archgw: {}", + String::from_utf8_lossy(&body_bytes) + ); // Deserialize body into spec. // Currently OpenAI API. diff --git a/demos/function_calling/arch_config.yaml b/demos/function_calling/arch_config.yaml index a2c92883..61f85882 100644 --- a/demos/function_calling/arch_config.yaml +++ b/demos/function_calling/arch_config.yaml @@ -17,7 +17,7 @@ overrides: llm_providers: - name: gpt - access_key: OPENAI_API_KEY + access_key: $OPENAI_API_KEY provider: openai model: gpt-3.5-turbo default: true