diff --git a/crates/Cargo.lock b/crates/Cargo.lock index ebe5b881..fbf817e7 100644 --- a/crates/Cargo.lock +++ b/crates/Cargo.lock @@ -436,11 +436,14 @@ name = "common" version = "0.1.0" dependencies = [ "axum", + "bytes", "derivative", "duration-string", "governor", "hermesllm", "hex", + "http-body-util", + "hyper 1.6.0", "log", "pretty_assertions", "proxy-wasm", diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 22722895..dea736e3 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -2,6 +2,7 @@ use std::sync::Arc; use std::time::Instant; use bytes::Bytes; +use common::errors::BrightStaffError; use common::llm_providers::LlmProviders; use hermesllm::apis::OpenAIMessage; use hermesllm::clients::SupportedAPIsFromClient; @@ -24,12 +25,12 @@ use crate::tracing::{operation_component, set_service_name}; /// Main errors for agent chat completions #[derive(Debug, thiserror::Error)] pub enum AgentFilterChainError { + #[error("Forwarded error: {0}")] + Brightstaff(#[from] BrightStaffError), #[error("Agent selection error: {0}")] Selection(#[from] AgentSelectionError), #[error("Pipeline processing error: {0}")] Pipeline(#[from] PipelineError), - #[error("Response handling error: {0}")] - Response(#[from] super::response_handler::ResponseError), #[error("Request parsing error: {0}")] RequestParsing(#[from] serde_json::Error), #[error("HTTP error: {0}")] @@ -103,16 +104,15 @@ pub async fn agent_chat( "agent_response": body }); + let status_code = hyper::StatusCode::from_u16(*status) + .unwrap_or(hyper::StatusCode::INTERNAL_SERVER_ERROR); + let json_string = error_json.to_string(); - let mut response = - Response::new(ResponseHandler::create_full_body(json_string)); - *response.status_mut() = hyper::StatusCode::from_u16(*status) - .unwrap_or(hyper::StatusCode::BAD_REQUEST); - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - "application/json".parse().unwrap(), - ); - return Ok(response); + return Ok(BrightStaffError::ForwardedError { + status_code, + message: json_string, + } + .into_response()); } // Print detailed error information with full error chain for other errors @@ -145,8 +145,11 @@ pub async fn agent_chat( // Log the error for debugging info!(error = %error_json, "structured error info"); - // Return JSON error response - Ok(ResponseHandler::create_json_error_response(&error_json)) + Ok(BrightStaffError::ForwardedError { + status_code: StatusCode::BAD_REQUEST, + message: error_json.to_string(), + } + .into_response()) } } } @@ -249,10 +252,7 @@ async fn handle_agent_chat_inner( None => { let err_msg = "No model specified in request and no default provider configured"; warn!("{}", err_msg); - let mut bad_request = - Response::new(ResponseHandler::create_full_body(err_msg.to_string())); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); + return Ok(BrightStaffError::NoModelSpecified.into_response()); } } } diff --git a/crates/brightstaff/src/handlers/integration_tests.rs b/crates/brightstaff/src/handlers/integration_tests.rs index 70eaacd7..70b2999d 100644 --- a/crates/brightstaff/src/handlers/integration_tests.rs +++ b/crates/brightstaff/src/handlers/integration_tests.rs @@ -5,9 +5,10 @@ use hyper::header::HeaderMap; use crate::handlers::agent_selector::{AgentSelectionError, AgentSelector}; use crate::handlers::pipeline_processor::PipelineProcessor; -use crate::handlers::response_handler::ResponseHandler; use crate::router::plano_orchestrator::OrchestratorService; - +use common::errors::BrightStaffError; +use http_body_util::BodyExt; +use hyper::StatusCode; /// Integration test that demonstrates the modular agent chat flow /// This test shows how the three main components work together: /// 1. AgentSelector - selects the appropriate agents based on orchestration @@ -128,8 +129,24 @@ mod tests { } // Test 4: Error Response Creation - let error_response = ResponseHandler::create_bad_request("Test error"); - assert_eq!(error_response.status(), hyper::StatusCode::BAD_REQUEST); + let err = BrightStaffError::ModelNotFound("gpt-5-secret".to_string()); + let response = err.into_response(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + // Helper to extract body as JSON + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(body["error"]["code"], "ModelNotFound"); + assert_eq!( + body["error"]["details"]["rejected_model_id"], + "gpt-5-secret" + ); + assert!(body["error"]["message"] + .as_str() + .unwrap() + .contains("gpt-5-secret")); println!("✅ All modular components working correctly!"); } @@ -148,12 +165,21 @@ mod tests { AgentSelectionError::ListenerNotFound(_) )); - // Test error response creation - let error_response = ResponseHandler::create_internal_error("Pipeline failed"); - assert_eq!( - error_response.status(), - hyper::StatusCode::INTERNAL_SERVER_ERROR - ); + let technical_reason = "Database connection timed out"; + let err = BrightStaffError::InternalServerError(technical_reason.to_string()); + + let response = err.into_response(); + + // --- 1. EXTRACT BYTES --- + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + + // --- 2. DECLARE body_json HERE --- + let body_json: serde_json::Value = + serde_json::from_slice(&body_bytes).expect("Failed to parse JSON body"); + + // --- 3. USE body_json --- + assert_eq!(body_json["error"]["code"], "InternalServerError"); + assert_eq!(body_json["error"]["details"]["reason"], technical_reason); println!("✅ Error handling working correctly!"); } diff --git a/crates/brightstaff/src/handlers/llm.rs b/crates/brightstaff/src/handlers/llm.rs index 435fb6f5..8e8f9661 100644 --- a/crates/brightstaff/src/handlers/llm.rs +++ b/crates/brightstaff/src/handlers/llm.rs @@ -8,9 +8,9 @@ use hermesllm::apis::openai_responses::InputParam; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::{ProviderRequest, ProviderRequestType}; use http_body_util::combinators::BoxBody; -use http_body_util::{BodyExt, Full}; +use http_body_util::BodyExt; use hyper::header::{self}; -use hyper::{Request, Response, StatusCode}; +use hyper::{Request, Response}; use opentelemetry::global; use opentelemetry::trace::get_active_span; use opentelemetry_http::HeaderInjector; @@ -30,11 +30,7 @@ use crate::state::{ }; use crate::tracing::{llm as tracing_llm, operation_component, set_service_name}; -fn full>(chunk: T) -> BoxBody { - Full::new(chunk.into()) - .map_err(|never| match never {}) - .boxed() -} +use common::errors::BrightStaffError; pub async fn llm_chat( request: Request, @@ -135,10 +131,11 @@ async fn llm_chat_inner( error = %err, "failed to parse request as ProviderRequestType" ); - let err_msg = format!("Failed to parse request: {}", err); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); + return Ok(BrightStaffError::InvalidRequest(format!( + "Failed to parse request: {}", + err + )) + .into_response()); } }; @@ -163,9 +160,7 @@ async fn llm_chat_inner( None => { let err_msg = "No model specified in request and no default provider configured"; warn!("{}", err_msg); - let mut bad_request = Response::new(full(err_msg.to_string())); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); + return Ok(BrightStaffError::NoModelSpecified.into_response()); } } } else { @@ -186,14 +181,8 @@ async fn llm_chat_inner( .get(&alias_resolved_model) .is_none() { - let err_msg = format!( - "Model '{}' not found in configured providers", - alias_resolved_model - ); warn!(model = %alias_resolved_model, "model not found in configured providers"); - let mut bad_request = Response::new(full(err_msg)); - *bad_request.status_mut() = StatusCode::BAD_REQUEST; - return Ok(bad_request); + return Ok(BrightStaffError::ModelNotFound(alias_resolved_model).into_response()); } // Handle provider/model slug format (e.g., "openai/gpt-4") @@ -288,13 +277,10 @@ async fn llm_chat_inner( Err(StateStorageError::NotFound(_)) => { // Return 409 Conflict when previous_response_id not found warn!(previous_response_id = %prev_resp_id, "previous response_id not found"); - let err_msg = format!( - "Conversation state not found for previous_response_id: {}", - prev_resp_id - ); - let mut conflict_response = Response::new(full(err_msg)); - *conflict_response.status_mut() = StatusCode::CONFLICT; - return Ok(conflict_response); + return Ok(BrightStaffError::ConversationStateNotFound( + prev_resp_id.to_string(), + ) + .into_response()); } Err(e) => { // Log warning but continue on other storage errors @@ -345,9 +331,11 @@ async fn llm_chat_inner( { Ok(result) => result, Err(err) => { - let mut internal_error = Response::new(full(err.message)); - *internal_error.status_mut() = err.status_code; - return Ok(internal_error); + return Ok(BrightStaffError::ForwardedError { + status_code: err.status_code, + message: err.message, + } + .into_response()); } }; @@ -413,10 +401,11 @@ async fn llm_chat_inner( { Ok(res) => res, Err(err) => { - let err_msg = format!("Failed to send request: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - return Ok(internal_error); + return Ok(BrightStaffError::InternalServerError(format!( + "Failed to send request: {}", + err + )) + .into_response()); } }; @@ -473,12 +462,11 @@ async fn llm_chat_inner( match response.body(streaming_response.body) { Ok(response) => Ok(response), - Err(err) => { - let err_msg = format!("Failed to create response: {}", err); - let mut internal_error = Response::new(full(err_msg)); - *internal_error.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - Ok(internal_error) - } + Err(err) => Ok(BrightStaffError::InternalServerError(format!( + "Failed to create response: {}", + err + )) + .into_response()), } } diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index e2561a8f..7331ab4c 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -1,25 +1,17 @@ use bytes::Bytes; +use common::errors::BrightStaffError; use hermesllm::apis::OpenAIApi; use hermesllm::clients::{SupportedAPIsFromClient, SupportedUpstreamAPIs}; use hermesllm::SseEvent; use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full, StreamBody}; use hyper::body::Frame; -use hyper::{Response, StatusCode}; +use hyper::Response; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; use tracing::{info, warn, Instrument}; -/// Errors that can occur during response handling -#[derive(Debug, thiserror::Error)] -pub enum ResponseError { - #[error("Failed to create response: {0}")] - ResponseCreationFailed(#[from] hyper::http::Error), - #[error("Stream error: {0}")] - StreamError(String), -} - /// Service for handling HTTP responses and streaming pub struct ResponseHandler; @@ -35,40 +27,6 @@ impl ResponseHandler { .boxed() } - /// Create an error response with a given status code and message - pub fn create_error_response( - status: StatusCode, - message: &str, - ) -> Response> { - let mut response = Response::new(Self::create_full_body(message.to_string())); - *response.status_mut() = status; - response - } - - /// Create a bad request response - pub fn create_bad_request(message: &str) -> Response> { - Self::create_error_response(StatusCode::BAD_REQUEST, message) - } - - /// Create an internal server error response - pub fn create_internal_error(message: &str) -> Response> { - Self::create_error_response(StatusCode::INTERNAL_SERVER_ERROR, message) - } - - /// Create a JSON error response - pub fn create_json_error_response( - error_json: &serde_json::Value, - ) -> Response> { - let json_string = error_json.to_string(); - let mut response = Response::new(Self::create_full_body(json_string)); - *response.status_mut() = StatusCode::INTERNAL_SERVER_ERROR; - response.headers_mut().insert( - hyper::header::CONTENT_TYPE, - "application/json".parse().unwrap(), - ); - response - } - /// Create a streaming response from a reqwest response. /// The spawned streaming task is instrumented with both `agent_span` and `orchestrator_span` /// so their durations reflect the actual time spent streaming to the client. @@ -77,13 +35,13 @@ impl ResponseHandler { llm_response: reqwest::Response, agent_span: tracing::Span, orchestrator_span: tracing::Span, - ) -> Result>, ResponseError> { + ) -> Result>, BrightStaffError> { // Copy headers from the original response let response_headers = llm_response.headers(); let mut response_builder = Response::builder(); let headers = response_builder.headers_mut().ok_or_else(|| { - ResponseError::StreamError("Failed to get mutable headers".to_string()) + BrightStaffError::StreamError("Failed to get mutable headers".to_string()) })?; for (header_name, header_value) in response_headers.iter() { @@ -123,7 +81,7 @@ impl ResponseHandler { response_builder .body(stream_body) - .map_err(ResponseError::from) + .map_err(BrightStaffError::from) } /// Collect the full response body as a string @@ -136,7 +94,7 @@ impl ResponseHandler { pub async fn collect_full_response( &self, llm_response: reqwest::Response, - ) -> Result { + ) -> Result { use hermesllm::apis::streaming_shapes::sse::SseStreamIter; let response_headers = llm_response.headers(); @@ -144,10 +102,9 @@ impl ResponseHandler { .get(hyper::header::CONTENT_TYPE) .is_some_and(|v| v.to_str().unwrap_or("").contains("text/event-stream")); - let response_bytes = llm_response - .bytes() - .await - .map_err(|e| ResponseError::StreamError(format!("Failed to read response: {}", e)))?; + let response_bytes = llm_response.bytes().await.map_err(|e| { + BrightStaffError::StreamError(format!("Failed to read response: {}", e)) + })?; if is_sse_streaming { let client_api = @@ -185,7 +142,7 @@ impl ResponseHandler { } else { // If not SSE, treat as regular text response let response_text = String::from_utf8(response_bytes.to_vec()).map_err(|e| { - ResponseError::StreamError(format!("Failed to decode response: {}", e)) + BrightStaffError::StreamError(format!("Failed to decode response: {}", e)) })?; Ok(response_text) @@ -204,42 +161,6 @@ mod tests { use super::*; use hyper::StatusCode; - #[test] - fn test_create_bad_request() { - let response = ResponseHandler::create_bad_request("Invalid request"); - assert_eq!(response.status(), StatusCode::BAD_REQUEST); - } - - #[test] - fn test_create_internal_error() { - let response = ResponseHandler::create_internal_error("Server error"); - assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); - } - - #[test] - fn test_create_error_response() { - let response = - ResponseHandler::create_error_response(StatusCode::NOT_FOUND, "Resource not found"); - assert_eq!(response.status(), StatusCode::NOT_FOUND); - } - - #[test] - fn test_create_json_error_response() { - let error_json = serde_json::json!({ - "error": { - "type": "TestError", - "message": "Test error message" - } - }); - - let response = ResponseHandler::create_json_error_response(&error_json); - assert_eq!(response.status(), StatusCode::INTERNAL_SERVER_ERROR); - assert_eq!( - response.headers().get("content-type").unwrap(), - "application/json" - ); - } - #[tokio::test] async fn test_create_streaming_response_with_mock() { use mockito::Server; diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index cb471bd6..dd2cba15 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -20,6 +20,9 @@ urlencoding = "2.1.3" url = "2.5.4" hermesllm = { version = "0.1.0", path = "../hermesllm" } serde_with = "3.13.0" +hyper = "1.0" +bytes = "1.0" +http-body-util = "0.1" [features] default = [] @@ -30,3 +33,6 @@ serde_json = "1.0.64" serial_test = "3.2" axum = "0.7" tokio = { version = "1.44", features = ["sync", "time", "macros", "rt"] } +hyper = { version = "1.0", features = ["full"] } +bytes = "1.0" +http-body-util = "0.1" diff --git a/crates/common/src/errors.rs b/crates/common/src/errors.rs index 21af3c94..b2f57199 100644 --- a/crates/common/src/errors.rs +++ b/crates/common/src/errors.rs @@ -1,9 +1,13 @@ -use proxy_wasm::types::Status; - use crate::{api::open_ai::ChatCompletionChunkResponseError, ratelimit}; +use bytes::Bytes; use hermesllm::apis::openai::OpenAIError; +use http_body_util::{combinators::BoxBody, BodyExt, Full}; +use hyper::{Error as HyperError, Response, StatusCode}; +use proxy_wasm::types::Status; +use serde_json::json; +use thiserror::Error; -#[derive(thiserror::Error, Debug)] +#[derive(Error, Debug)] pub enum ClientError { #[error("Error dispatching HTTP call to `{upstream_name}/{path}`, error: {internal_status:?}")] DispatchError { @@ -13,7 +17,7 @@ pub enum ClientError { }, } -#[derive(thiserror::Error, Debug)] +#[derive(Error, Debug)] pub enum ServerError { #[error(transparent)] HttpDispatch(ClientError), @@ -43,3 +47,174 @@ pub enum ServerError { #[error("error parsing openai message: {0}")] OpenAIPError(#[from] OpenAIError), } +// ----------------------------------------------------------------------------- +// BrightStaff Errors (Standardized) +// ----------------------------------------------------------------------------- +#[derive(Debug, Error)] +pub enum BrightStaffError { + #[error("The requested model '{0}' does not exist")] + ModelNotFound(String), + + #[error("No model specified in request and no default provider configured")] + NoModelSpecified, + + #[error("Conversation state not found for previous_response_id: {0}")] + ConversationStateNotFound(String), + + #[error("Internal server error")] + InternalServerError(String), + + #[error("Invalid request")] + InvalidRequest(String), + + #[error("{message}")] + ForwardedError { + status_code: StatusCode, + message: String, + }, + + #[error("Stream error: {0}")] + StreamError(String), + + #[error("Failed to create response: {0}")] + ResponseCreationFailed(#[from] hyper::http::Error), +} + +impl BrightStaffError { + pub fn into_response(self) -> Response> { + let (status, code, details) = match &self { + BrightStaffError::ModelNotFound(model_name) => ( + StatusCode::NOT_FOUND, + "ModelNotFound", + json!({ "rejected_model_id": model_name }), + ), + + BrightStaffError::NoModelSpecified => { + (StatusCode::BAD_REQUEST, "NoModelSpecified", json!({})) + } + + BrightStaffError::ConversationStateNotFound(prev_resp_id) => ( + StatusCode::CONFLICT, + "ConversationStateNotFound", + json!({ "previous_response_id": prev_resp_id }), + ), + + BrightStaffError::InternalServerError(reason) => ( + StatusCode::INTERNAL_SERVER_ERROR, + "InternalServerError", + // Passing the reason into details for easier debugging + json!({ "reason": reason }), + ), + + BrightStaffError::InvalidRequest(reason) => ( + StatusCode::BAD_REQUEST, + "InvalidRequest", + json!({ "reason": reason }), + ), + + BrightStaffError::ForwardedError { + status_code, + message, + } => (*status_code, "ForwardedError", json!({ "reason": message })), + + BrightStaffError::StreamError(reason) => ( + StatusCode::BAD_REQUEST, + "StreamError", + json!({ "reason": reason }), + ), + + BrightStaffError::ResponseCreationFailed(reason) => ( + StatusCode::BAD_REQUEST, + "ResponseCreationFailed", + json!({ "reason": reason.to_string() }), + ), + }; + + let body_json = json!({ + "error": { + "code": code, + "message": self.to_string(), + "details": details + } + }); + + // 1. Create the concrete body + let full_body = Full::new(Bytes::from(body_json.to_string())); + + // 2. Convert it to BoxBody + // We map_err because Full never fails, but BoxBody expects a HyperError + let boxed_body = full_body + .map_err(|never| match never {}) // This handles the "Infallible" error type + .boxed(); + + Response::builder() + .status(status) + .header("content-type", "application/json") + .body(boxed_body) + .unwrap_or_else(|_| { + Response::new( + Full::new(Bytes::from("Internal Error")) + .map_err(|never| match never {}) + .boxed(), + ) + }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use http_body_util::BodyExt; // For .collect().await + + #[tokio::test] + async fn test_model_not_found_format() { + let err = BrightStaffError::ModelNotFound("gpt-5-secret".to_string()); + let response = err.into_response(); + + assert_eq!(response.status(), StatusCode::NOT_FOUND); + + // Helper to extract body as JSON + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(body["error"]["code"], "ModelNotFound"); + assert_eq!( + body["error"]["details"]["rejected_model_id"], + "gpt-5-secret" + ); + assert!(body["error"]["message"] + .as_str() + .unwrap() + .contains("gpt-5-secret")); + } + + #[tokio::test] + async fn test_forwarded_error_preserves_status() { + let err = BrightStaffError::ForwardedError { + status_code: StatusCode::TOO_MANY_REQUESTS, + message: "Rate limit exceeded on agent side".to_string(), + }; + + let response = err.into_response(); + assert_eq!(response.status(), StatusCode::TOO_MANY_REQUESTS); + + let body_bytes = response.into_body().collect().await.unwrap().to_bytes(); + let body: serde_json::Value = serde_json::from_slice(&body_bytes).unwrap(); + + assert_eq!(body["error"]["code"], "ForwardedError"); + } + + #[tokio::test] + async fn test_hyper_error_wrapping() { + // Manually trigger a hyper error by creating an invalid URI/Header + let hyper_err = hyper::http::Response::builder() + .status(1000) // Invalid status + .body(()) + .unwrap_err(); + + let err = BrightStaffError::ResponseCreationFailed(hyper_err); + let response = err.into_response(); + + assert_eq!(response.status(), StatusCode::BAD_REQUEST); + } +}