add support for v1/messages and transformations (#558)

* pushing draft PR

* transformations are working. Now need to add some tests next

* updated tests and added necessary response transformations for Anthropics' message response object

* fixed bugs for integration tests

* fixed doc tests

* fixed serialization issues with enums on response

* adding some debug logs to help

* fixed issues with non-streaming responses

* updated the stream_context to update response bytes

* the serialized bytes length must be set in the response side

* fixed the debug statement that was causing the integration tests for wasm to fail

* fixing json parsing errors

* intentionally removing the headers

* making sure that we convert the raw bytes to the correct provider type upstream

* fixing non-streaming responses to tranform correctly

* /v1/messages works with transformations to and from /v1/chat/completions

* updating the CLI and demos to support anthropic vs. claude

* adding the anthropic key to the preference based routing tests

* fixed test cases and added more structured logs

* fixed integration tests and cleaned up logs

* added python client tests for anthropic and openai

* cleaned up logs and fixed issue with connectivity for llm gateway in weather forecast demo

* fixing the tests. python dependency order was broken

* updated the openAI client to fix demos

* removed the raw response debug statement

* fixed the dup cloning issue and cleaned up the ProviderRequestType enum and traits

* fixing logs

* moved away from string literals to consts

* fixed streaming from Anthropic Client to OpenAI

* removed debug statement that would likely trip up integration tests

* fixed integration tests for llm_gateway

* cleaned up test cases and removed unnecessary crates

* fixing comments from PR

* fixed bug whereby we were sending an OpenAIChatCompletions request object to llm_gateway even though the request may have been AnthropicMessages

---------

Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-4.local>
Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-9.local>
Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-10.local>
Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-41.local>
Co-authored-by: Salman Paracha <salmanparacha@MacBook-Pro-136.local>
This commit is contained in:
Salman Paracha 2025-09-10 07:40:30 -07:00 committed by GitHub
parent bb71d041a0
commit fb0581fd39
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
38 changed files with 2842 additions and 919 deletions

View file

@ -4,6 +4,8 @@ use bytes::Bytes;
use common::configuration::ModelUsagePreference;
use common::consts::ARCH_PROVIDER_HINT_HEADER;
use hermesllm::apis::openai::ChatCompletionsRequest;
use hermesllm::clients::SupportedAPIs;
use hermesllm::{ProviderRequest, ProviderRequestType};
use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::Frame;
@ -22,66 +24,61 @@ fn full<T: Into<Bytes>>(chunk: T) -> BoxBody<Bytes, hyper::Error> {
.boxed()
}
pub async fn chat_completions(
pub async fn chat(
request: Request<hyper::body::Incoming>,
router_service: Arc<RouterService>,
llm_provider_endpoint: String,
full_qualified_llm_provider_url: String,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, hyper::Error> {
let request_path = request.uri().path().to_string();
let mut request_headers = request.headers().clone();
let chat_request_bytes = request.collect().await?.to_bytes();
debug!("Received request body (raw utf8): {}", String::from_utf8_lossy(&chat_request_bytes));
let mut client_request = match ProviderRequestType::try_from((&chat_request_bytes[..], &SupportedAPIs::from_endpoint(request_path.as_str()).unwrap())) {
Ok(request) => request,
Err(err) => {
warn!("Failed to parse request as ProviderRequestType: {}", err);
let err_msg = format!("Failed to parse request: {}", err);
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
}
};
let chat_request_parsed = serde_json::from_slice::<serde_json::Value>(&chat_request_bytes)
.inspect_err(|err| {
warn!(
"Failed to parse request body as JSON: err: {}, str: {}",
err,
String::from_utf8_lossy(&chat_request_bytes)
)
})
.unwrap_or_else(|_| {
warn!(
"Failed to parse request body as JSON: {}",
String::from_utf8_lossy(&chat_request_bytes)
);
serde_json::Value::Null
});
// Clone metadata for routing and remove archgw_preference_config from original
let routing_metadata = client_request.metadata().clone();
if chat_request_parsed == serde_json::Value::Null {
warn!("Request body is not valid JSON");
let err_msg = "Request body is not valid JSON".to_string();
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
if client_request.remove_metadata_key("archgw_preference_config") {
debug!("Removed archgw_preference_config from metadata");
}
let chat_completion_request: ChatCompletionsRequest =
serde_json::from_value(chat_request_parsed.clone()).unwrap();
let client_request_bytes_for_upstream = ProviderRequestType::to_bytes(&client_request).unwrap();
// remove metadata from the request
let mut chat_request_user_preferences_removed = chat_request_parsed;
if let Some(metadata) = chat_request_user_preferences_removed.get_mut("metadata") {
debug!("Removing metadata from request");
if let Some(m) = metadata.as_object_mut() {
m.remove("archgw_preference_config");
debug!("Removed archgw_preference_config from metadata");
}
// if metadata is empty, remove it
if metadata.as_object().map_or(false, |m| m.is_empty()) {
debug!("Removing empty metadata from request");
chat_request_user_preferences_removed
.as_object_mut()
.map(|m| m.remove("metadata"));
}
}
// Convert to ChatCompletionsRequest regardless of input type (clone to avoid moving original)
let chat_completions_request_for_arch_router: ChatCompletionsRequest =
match ProviderRequestType::try_from((client_request, &SupportedAPIs::OpenAIChatCompletions(hermesllm::apis::OpenAIApi::ChatCompletions))) {
Ok(ProviderRequestType::ChatCompletionsRequest(req)) => req,
Ok(ProviderRequestType::MessagesRequest(_)) => {
// This should not happen after conversion to OpenAI format
warn!("Unexpected: got MessagesRequest after converting to OpenAI format");
let err_msg = "Request conversion failed".to_string();
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
},
Err(err) => {
warn!("Failed to convert request to ChatCompletionsRequest: {}", err);
let err_msg = format!("Failed to convert request: {}", err);
let mut bad_request = Response::new(full(err_msg));
*bad_request.status_mut() = StatusCode::BAD_REQUEST;
return Ok(bad_request);
}
};
debug!(
"arch-router request received: {}",
&serde_json::to_string(&chat_completion_request).unwrap()
"[BRIGHTSTAFF -> ARCH_ROUTER] REQ: {}",
&serde_json::to_string(&chat_completions_request_for_arch_router).unwrap()
);
let trace_parent = request_headers
@ -90,7 +87,7 @@ pub async fn chat_completions(
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
let usage_preferences_str: Option<String> =
chat_completion_request.metadata.and_then(|metadata| {
routing_metadata.as_ref().and_then(|metadata| {
metadata
.get("archgw_preference_config")
.map(|value| value.to_string())
@ -101,7 +98,7 @@ pub async fn chat_completions(
.and_then(|s| serde_yaml::from_str(s).ok());
let latest_message_for_log =
chat_completion_request
chat_completions_request_for_arch_router
.messages
.last()
.map_or("None".to_string(), |msg| {
@ -126,7 +123,7 @@ pub async fn chat_completions(
let model_name = match router_service
.determine_route(
&chat_completion_request.messages,
&chat_completions_request_for_arch_router.messages,
trace_parent.clone(),
usage_preferences,
)
@ -137,9 +134,9 @@ pub async fn chat_completions(
None => {
debug!(
"No route determined, using default model from request: {}",
chat_completion_request.model
chat_completions_request_for_arch_router.model
);
chat_completion_request.model.clone()
chat_completions_request_for_arch_router.model.clone()
}
},
Err(err) => {
@ -151,8 +148,8 @@ pub async fn chat_completions(
};
debug!(
"sending request to llm provider: {}, with model hint: {}",
llm_provider_endpoint, model_name
"[BRIGHTSTAFF -> ARCH_ROUTER] URL: {}, Model Hint: {}",
full_qualified_llm_provider_url, model_name
);
request_headers.insert(
@ -166,17 +163,13 @@ pub async fn chat_completions(
header::HeaderValue::from_str(&trace_parent).unwrap(),
);
}
let chat_request_parsed_bytes =
serde_json::to_string(&chat_request_user_preferences_removed).unwrap();
// remove content-length header if it exists
request_headers.remove(header::CONTENT_LENGTH);
let llm_response = match reqwest::Client::new()
.post(llm_provider_endpoint)
.post(full_qualified_llm_provider_url)
.headers(request_headers)
.body(chat_request_parsed_bytes)
.body(client_request_bytes_for_upstream)
.send()
.await
{

View file

@ -1,9 +1,10 @@
use brightstaff::handlers::chat_completions::chat_completions;
use brightstaff::handlers::chat_completions::chat;
use brightstaff::handlers::models::list_models;
use brightstaff::router::llm_router::RouterService;
use brightstaff::utils::tracing::init_tracer;
use bytes::Bytes;
use common::configuration::Configuration;
use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH};
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use hyper::body::Incoming;
use hyper::server::conn::http1;
@ -67,10 +68,10 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
&serde_json::to_string(arch_config.as_ref()).unwrap()
);
let llm_provider_endpoint = env::var("LLM_PROVIDER_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:12001/v1/chat/completions".to_string());
let llm_provider_url = env::var("LLM_PROVIDER_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:12001".to_string());
info!("llm provider endpoint: {}", llm_provider_endpoint);
info!("llm provider url: {}", llm_provider_url);
info!("listening on http://{}", bind_address);
let listener = TcpListener::bind(bind_address).await?;
@ -88,7 +89,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let router_service: Arc<RouterService> = Arc::new(RouterService::new(
arch_config.llm_providers.clone(),
llm_provider_endpoint.clone(),
llm_provider_url.clone() + CHAT_COMPLETIONS_PATH,
routing_model_name,
routing_llm_provider,
));
@ -99,19 +100,21 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let io = TokioIo::new(stream);
let router_service: Arc<RouterService> = Arc::clone(&router_service);
let llm_provider_endpoint = llm_provider_endpoint.clone();
let llm_provider_url = llm_provider_url.clone();
let llm_providers = llm_providers.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
let parent_cx = extract_context_from_request(&req);
let llm_provider_endpoint = llm_provider_endpoint.clone();
let llm_provider_url = llm_provider_url.clone();
let llm_providers = llm_providers.clone();
async move {
match (req.method(), req.uri().path()) {
(&Method::POST, "/v1/chat/completions") => {
chat_completions(req, router_service, llm_provider_endpoint)
(&Method::POST, CHAT_COMPLETIONS_PATH | MESSAGES_PATH) => {
let fully_qualified_url = format!("{}{}", llm_provider_url, req.uri().path());
chat(req, router_service, fully_qualified_url)
.with_context(parent_cx)
.await
}