first commit to get Bedrock Converse API working. Next commit support for streaming and binary frames

This commit is contained in:
Salman Paracha 2025-10-11 14:58:37 -07:00
parent 6a06d9ac97
commit bf67ea126f
30 changed files with 4842 additions and 1182 deletions

View file

@ -1,3 +1,4 @@
use hermesllm::clients::endpoints::SupportedUpstreamAPIs;
use http::StatusCode;
use log::{debug, info, warn};
use proxy_wasm::hostcalls::get_current_time;
@ -12,8 +13,8 @@ use std::time::{Duration, SystemTime, UNIX_EPOCH};
use crate::metrics::Metrics;
use common::configuration::{LlmProvider, LlmProviderType, Overrides};
use common::consts::{
ARCH_PROVIDER_HINT_HEADER, ARCH_ROUTING_HEADER, HEALTHZ_PATH, RATELIMIT_SELECTOR_HEADER_KEY,
REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
ARCH_IS_STREAMING_HEADER, ARCH_PROVIDER_HINT_HEADER, ARCH_ROUTING_HEADER, HEALTHZ_PATH,
RATELIMIT_SELECTOR_HEADER_KEY, REQUEST_ID_HEADER, TRACE_PARENT_HEADER,
};
use common::errors::ServerError;
use common::llm_providers::LlmProviders;
@ -33,7 +34,7 @@ pub struct StreamContext {
/// The API that is requested by the client (before compatibility mapping)
client_api: Option<SupportedAPIs>,
/// The API that should be used for the upstream provider (after compatibility mapping)
resolved_api: Option<SupportedAPIs>,
resolved_api: Option<SupportedUpstreamAPIs>,
llm_providers: Rc<LlmProviders>,
llm_provider: Option<Rc<LlmProvider>>,
request_id: Option<String>,
@ -108,6 +109,7 @@ impl StreamContext {
.model
.as_ref()
.unwrap_or(&"".to_string()),
self.streaming_response,
);
if target_endpoint != request_path {
self.set_http_request_header(":path", Some(&target_endpoint));
@ -148,14 +150,19 @@ impl StreamContext {
// Set API-specific headers based on the resolved upstream API
match self.resolved_api.as_ref() {
Some(SupportedAPIs::AnthropicMessagesAPI(_)) => {
Some(SupportedUpstreamAPIs::AnthropicMessagesAPI(_)) => {
// Anthropic API requires x-api-key and anthropic-version headers
// Remove any existing Authorization header since Anthropic doesn't use it
self.remove_http_request_header("Authorization");
self.set_http_request_header("x-api-key", Some(llm_provider_api_key_value));
self.set_http_request_header("anthropic-version", Some("2023-06-01"));
}
Some(SupportedAPIs::OpenAIChatCompletions(_)) | None => {
Some(
SupportedUpstreamAPIs::OpenAIChatCompletions(_)
| SupportedUpstreamAPIs::AmazonBedrockConverse(_)
| SupportedUpstreamAPIs::AmazonBedrockConverseStream(_),
)
| None => {
// OpenAI and default: use Authorization Bearer token
// Remove any existing x-api-key header since OpenAI doesn't use it
self.remove_http_request_header("x-api-key");
@ -410,7 +417,8 @@ impl StreamContext {
match self.client_api.as_ref() {
Some(client_api) => {
let client_api = client_api.clone(); // Clone to avoid borrowing issues
let upstream_api = provider_id.compatible_api_for_client(&client_api);
let upstream_api =
provider_id.compatible_api_for_client(&client_api, self.streaming_response);
// Parse body into SSE iterator using TryFrom
let sse_iter: SseStreamIter<std::vec::IntoIter<String>> =
@ -578,6 +586,11 @@ impl HttpContext for StreamContext {
return Action::Continue;
}
self.streaming_response = self
.get_http_request_header(ARCH_IS_STREAMING_HEADER)
.map(|val| val == "true")
.unwrap_or(false);
let use_agent_orchestrator = match self.overrides.as_ref() {
Some(overrides) => overrides.use_agent_orchestrator.unwrap_or_default(),
None => false,
@ -612,7 +625,17 @@ impl HttpContext for StreamContext {
(self.client_api.as_ref(), self.llm_provider.as_ref())
{
let provider_id = provider.to_provider_id();
self.resolved_api = Some(provider_id.compatible_api_for_client(api));
self.resolved_api =
Some(provider_id.compatible_api_for_client(api, self.streaming_response));
debug!(
"[ARCHGW_REQ_ID:{}] ROUTING_INFO: provider='{}' client_api={:?} resolved_api={:?} request_path='{}'",
self.request_identifier(),
provider.to_provider_id(),
api,
self.resolved_api,
request_path
);
} else {
self.resolved_api = None;
}
@ -697,7 +720,7 @@ impl HttpContext for StreamContext {
//We need to deserialize the request body based on the resolved API
let mut deserialized_client_request: ProviderRequestType = match self.client_api.as_ref() {
Some(the_client_api) => {
debug!(
info!(
"[ARCHGW_REQ_ID:{}] CLIENT_REQUEST_RECEIVED: api={:?} body_size={}",
self.request_identifier(),
the_client_api,
@ -795,7 +818,10 @@ impl HttpContext for StreamContext {
);
// Use provider interface for streaming detection and setup
self.streaming_response = deserialized_client_request.is_streaming();
// If streaming_response is not already set from headers, get it from the parsed request
if !self.streaming_response {
self.streaming_response = deserialized_client_request.is_streaming();
}
// Use provider interface for text extraction (after potential mutation)
let input_tokens_str = deserialized_client_request.extract_messages_text();