separate out currency exchange and preference based routing (#491)

This commit is contained in:
Adil Hafeez 2025-05-30 02:14:37 -07:00 committed by GitHub
parent 470cdf9843
commit fffa837a06
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 103 additions and 57 deletions

View file

@ -1,4 +1,7 @@
name: e2e demo tests
name: e2e demo tests currency conversion
permissions:
contents: read
on:
push:
@ -51,4 +54,4 @@ jobs:
GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }}
run: |
source venv/bin/activate
cd demos/shared/test_runner && sh run_demo_tests.sh
cd demos/shared/test_runner && sh run_demo_tests.sh samples_python/currency_exchange

View file

@ -0,0 +1,57 @@
name: e2e demo preference based routing tests
permissions:
contents: read
on:
push:
branches:
- main
pull_request:
jobs:
e2e_demo_tests:
runs-on: ubuntu-latest-m
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: "3.12"
- name: build arch docker image
run: |
docker build -f arch/Dockerfile . -t katanemo/archgw -t katanemo/archgw:0.3.0
- name: install poetry
run: |
export POETRY_VERSION=1.8.5
curl -sSL https://install.python-poetry.org | python3 -
- name: setup python venv
run: |
python -m venv venv
- name: install hurl
run: |
curl --location --remote-name https://github.com/Orange-OpenSource/hurl/releases/download/4.0.0/hurl_4.0.0_amd64.deb
sudo dpkg -i hurl_4.0.0_amd64.deb
- name: install model server, arch gateway and test dependencies
run: |
source venv/bin/activate
cd model_server/ && echo "installing model server" && poetry install
cd ../arch/tools && echo "installing archgw cli" && poetry install
cd ../../demos/shared/test_runner && echo "installing test dependencies" && poetry install
- name: run demo tests
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
MISTRAL_API_KEY: ${{ secrets.MISTRAL_API_KEY }}
GROQ_API_KEY: ${{ secrets.GROQ_API_KEY }}
run: |
source venv/bin/activate
cd demos/shared/test_runner && sh run_demo_tests.sh use_cases/preference_based_routing

View file

@ -116,62 +116,40 @@ pub async fn chat_completions(
headers.insert(header_name, header_value.clone());
}
if chat_completion_request.stream {
// channel to create async stream
let (tx, rx) = mpsc::channel::<Bytes>(16);
// channel to create async stream
let (tx, rx) = mpsc::channel::<Bytes>(16);
// Spawn a task to send data as it becomes available
tokio::spawn(async move {
let mut byte_stream = llm_response.bytes_stream();
// Spawn a task to send data as it becomes available
tokio::spawn(async move {
let mut byte_stream = llm_response.bytes_stream();
while let Some(item) = byte_stream.next().await {
let item = match item {
Ok(item) => item,
Err(err) => {
warn!("Error receiving chunk: {:?}", err);
break;
}
};
if tx.send(item).await.is_err() {
warn!("Receiver dropped");
while let Some(item) = byte_stream.next().await {
let item = match item {
Ok(item) => item,
Err(err) => {
warn!("Error receiving chunk: {:?}", err);
break;
}
}
});
};
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
let stream_body = BoxBody::new(StreamBody::new(stream));
match response.body(stream_body) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
if tx.send(item).await.is_err() {
warn!("Receiver dropped");
break;
}
}
} else {
let body = match llm_response.text().await {
Ok(body) => body,
Err(err) => {
let err_msg = format!("Failed to read response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
return Ok(internal_error);
}
};
});
match response.body(full(body)) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
}
let stream = ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk)));
let stream_body = BoxBody::new(StreamBody::new(stream));
match response.body(stream_body) {
Ok(response) => Ok(response),
Err(err) => {
let err_msg = format!("Failed to create response: {}", err);
let mut internal_error = Response::new(full(err_msg));
*internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR;
Ok(internal_error)
}
}
}

View file

@ -1,11 +1,20 @@
#!/bin/bash
set -eu
echo "docker images"
docker images
# load demo name from arguments
if [ $# -eq 0 ]; then
echo "No demo names provided. Please provide demo names as arguments."
# print usage
echo "Usage: $0 <demo_name1> <demo_name2> ..."
exit 1
fi
# for demo in currency_exchange hr_agent
for demo in samples_python/currency_exchange use_cases/preference_based_routing
# extract demo names from arguments
DEMOS="$@"
echo "Running tests for demos: $DEMOS"
for demo in $DEMOS
do
echo "******************************************"
echo "Running tests for $demo ..."
@ -16,11 +25,10 @@ do
echo "starting docker containers"
docker compose up -d 2>&1 > /dev/null
echo "starting hurl tests"
hurl --test hurl_tests/*.hurl
if [ $? -ne 0 ]; then
if ! hurl hurl_tests/*.hurl; then
echo "Hurl tests failed for $demo"
echo "docker logs for archgw:"
docker logs archgw
docker logs archgw | tail -n 100
exit 1
fi
echo "stopping docker containers and archgw"