From 8d80679822918464ea2c6cafbe3faf989ab03067 Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 22 Oct 2025 15:55:13 -0700 Subject: [PATCH] pending changes --- .../src/handlers/agent_chat_completions.rs | 39 +--- .../src/handlers/response_handler.rs | 169 +++--------------- demos/use_cases/rag_agent/test.rest | 12 +- 3 files changed, 29 insertions(+), 191 deletions(-) diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 001c9caf..6884ccbd 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -118,13 +118,6 @@ async fn handle_agent_chat( AgentFilterChainError::RequestParsing(err) })?; - // Check for debug/reasoning mode, by default its enabled - let debug_mode = request_headers - .get("x-debug-mode") - .and_then(|v| v.to_str().ok()) - .map(|v| v == "true") - .unwrap_or(true); - // Check if streaming is enabled let is_streaming = chat_completions_request.stream.unwrap_or(false); @@ -149,45 +142,15 @@ async fn handle_agent_chat( }; return response_handler - .create_streaming_response_with_realtime_reasoning( + .create_response_with_reasoning( &chat_completions_request, &selected_agent, &agent_map, &request_headers, &pipeline_processor, is_streaming, - debug_mode, ) .await .map_err(AgentFilterChainError::from); - // // For normal mode, process pipeline and stream response - // let processed_messages = pipeline_processor - // .process_filter_chain( - // &chat_completions_request, - // &selected_agent, - // &agent_map, - // &request_headers, - // ) - // .await?; - - // // Get terminal agent and send final response - // let terminal_agent = agent_map.get(&selected_agent.id).unwrap(); - - // debug!("Processing terminal agent: {}", selected_agent.id); - // debug!("Terminal agent details: {:?}", terminal_agent); - - // let llm_response = pipeline_processor - // .invoke_upstream_agent( - // &processed_messages, - // &chat_completions_request, - // terminal_agent, - // &request_headers, - // ) - // .await?; - - // response_handler - // .create_streaming_response(llm_response) - // .await - // .map_err(AgentFilterChainError::from) } diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index 9dde3c23..96c7e4da 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -120,114 +120,9 @@ impl ResponseHandler { .map_err(ResponseError::from) } - /// Create a streaming response with reasoning blocks for debug mode - pub async fn create_streaming_response_with_reasoning( - &self, - chat_request: &ChatCompletionsRequest, - selected_agent: &AgentFilterChain, - processed_messages: &[hermesllm::apis::openai::Message], - llm_response: reqwest::Response, - ) -> Result>, ResponseError> { - // Create reasoning content - let mut reasoning_content = String::new(); - - reasoning_content.push_str(&format!( - "Starting agent processing pipeline for query: \"{}\"\n\n", - chat_request - .messages - .last() - .map(|m| match &m.content { - hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(), - _ => "complex content", - }) - .unwrap_or("unknown") - )); - - reasoning_content.push_str(&format!( - "Selected agent filter chain: {} {}\n", - selected_agent.id, - selected_agent.description.as_deref().unwrap_or("") - )); - - if !selected_agent.filter_chain.is_empty() { - reasoning_content.push_str(&format!( - "Processed {} filter agents in sequence:\n", - selected_agent.filter_chain.len() - )); - - for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() { - reasoning_content.push_str(&format!( - "{}. āœ“ Filter agent: {} completed\n", - index + 1, - filter_name - )); - } - } else { - reasoning_content - .push_str("No filter agents configured, proceeded directly to terminal agent\n"); - } - - reasoning_content.push_str(&format!( - "\nFilter chain processing completed successfully\n" - )); - reasoning_content.push_str(&format!( - "Final message count: {}\n", - processed_messages.len() - )); - reasoning_content.push_str("Now streaming response from terminal agent...\n\n"); - - // Create channel for streaming - let (tx, rx) = mpsc::channel::>(100); - - // Send reasoning block first - let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content); - let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await; - - // Clone response headers for streaming - let response_headers = llm_response.headers().clone(); - - tokio::spawn(async move { - // Stream the LLM response chunks - let mut byte_stream = llm_response.bytes_stream(); - while let Some(item) = byte_stream.next().await { - match item { - Ok(chunk) => { - if tx.send(Ok(chunk)).await.is_err() { - break; - } - } - Err(_) => break, - } - } - }); - - // Create streaming response with original headers - let mut response_builder = Response::builder(); - - // Copy relevant headers from the LLM response - if let Some(headers) = response_builder.headers_mut() { - for (header_name, header_value) in response_headers.iter() { - if header_name == "content-type" || header_name == "cache-control" { - headers.insert(header_name, header_value.clone()); - } - } - } - - let stream = - ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?))); - let stream_body = BoxBody::new(StreamBody::new(stream)); - - response_builder - .status(200) - .header("content-type", "text/event-stream") - .header("cache-control", "no-cache") - .header("connection", "keep-alive") - .body(stream_body) - .map_err(ResponseError::from) - } /// Create a streaming response with real-time reasoning blocks for debug mode - pub async fn create_streaming_response_with_realtime_reasoning( + pub async fn create_response_with_reasoning( &self, chat_request: &ChatCompletionsRequest, selected_agent: &AgentFilterChain, @@ -235,7 +130,6 @@ impl ResponseHandler { request_headers: &hyper::HeaderMap, _pipeline_processor: &PipelineProcessor, is_streaming: bool, - debug_mode: bool, ) -> Result>, ResponseError> { // Create channel for streaming let (tx, rx) = mpsc::channel::>(100); @@ -245,38 +139,27 @@ impl ResponseHandler { let selected_agent = selected_agent.clone(); let agent_map = agent_map.clone(); let request_headers = request_headers.clone(); - let url = "http://localhost:11000/v1/chat/completions".to_string(); + let internal_router_url = "http://localhost:11000/v1/chat/completions".to_string(); tokio::spawn(async move { - if debug_mode && is_streaming { + if is_streaming { // Send initial reasoning block let mut reasoning_content = String::new(); - reasoning_content.push_str(&format!( - "šŸš€ Starting agent processing pipeline for query: \"{}\"\n\n", - chat_request - .messages - .last() - .map(|m| match &m.content { - hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(), - _ => "complex content", - }) - .unwrap_or("unknown") - )); reasoning_content.push_str(&format!( - "šŸŽÆ Selected agent filter chain: {} {}\n", + "Selected agent filter chain: {} {}\n", selected_agent.id, selected_agent.description.as_deref().unwrap_or("") )); if !selected_agent.filter_chain.is_empty() { reasoning_content.push_str(&format!( - "šŸ”„ Processing {} filter agents in sequence:\n", + "Processing {} filter agents in sequence:\n", selected_agent.filter_chain.len() )); } else { reasoning_content.push_str( - "⚔ No filter agents configured, proceeding directly to terminal agent\n", + "No filter agents configured, proceeding directly to final agent\n", ); } @@ -288,19 +171,19 @@ impl ResponseHandler { let mut current_messages = chat_request.messages.clone(); if !selected_agent.filter_chain.is_empty() { - for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() { - if debug_mode && is_streaming { + for (index, agent_name) in selected_agent.filter_chain.iter().enumerate() { + if is_streaming { // Send reasoning update for starting this filter let filter_start_reasoning = format!( - "šŸ”„ Step {}: Calling filter agent '{}'\n", + "Step {}: Calling filter agent '{}'\n", index + 1, - filter_name + agent_name ); let filter_chunk = Self::create_reasoning_chunk(&filter_start_reasoning); let _ = tx.send(Ok(Bytes::from(filter_chunk))).await; } - if let Some(filter_agent) = agent_map.get(filter_name) { + if let Some(filter_agent) = agent_map.get(agent_name) { let start_time = std::time::Instant::now(); // Process this individual filter @@ -309,22 +192,20 @@ impl ResponseHandler { &chat_request, filter_agent, &request_headers, - &url, + &internal_router_url, ) .await { Ok(new_messages) => { current_messages = new_messages; - if debug_mode && is_streaming { + if is_streaming { let duration = start_time.elapsed(); // Send success reasoning let success_reasoning = format!( - " āœ… Filter '{}' completed in {}ms\n šŸ“Š Message count: {} → {}\n", - filter_name, + " Agent '{}' completed in {}ms\n", + agent_name, duration.as_millis(), - chat_request.messages.len(), - current_messages.len() ); let success_chunk = Self::create_reasoning_chunk(&success_reasoning); @@ -332,10 +213,10 @@ impl ResponseHandler { } } Err(e) => { - if debug_mode && is_streaming { + if is_streaming { // Send error reasoning let error_reasoning = - format!(" āŒ Filter '{}' failed: {}\n", filter_name, e); + format!(" Filter '{}' failed: {}\n", agent_name, e); let error_chunk = Self::create_reasoning_chunk(&error_reasoning); let _ = tx.send(Ok(Bytes::from(error_chunk))).await; @@ -344,11 +225,11 @@ impl ResponseHandler { } } } else { - if debug_mode && is_streaming { + if is_streaming { // Send not found reasoning let not_found_reasoning = format!( " āš ļø Filter agent '{}' not found in agent map\n", - filter_name + agent_name ); let not_found_chunk = Self::create_reasoning_chunk(¬_found_reasoning); @@ -358,10 +239,10 @@ impl ResponseHandler { } } - if debug_mode && is_streaming { + if is_streaming { // Send terminal agent reasoning let terminal_reasoning = format!( - "\nšŸŽÆ Filter chain completed! Invoking terminal agent for final response...\n\n" + "\n Filter chain completed! Invoking final agent for final response...\n\n" ); let terminal_chunk = Self::create_reasoning_chunk(&terminal_reasoning); let _ = tx.send(Ok(Bytes::from(terminal_chunk))).await; @@ -378,11 +259,8 @@ impl ResponseHandler { Err(_) => return, }; - info!("chat request string: {}", request_body); - let client = reqwest::Client::new(); let mut agent_headers = request_headers; - info!("request headers: {:?}", agent_headers); agent_headers.remove(hyper::header::CONTENT_LENGTH); agent_headers.insert( common::consts::ARCH_UPSTREAM_HOST_HEADER, @@ -391,10 +269,9 @@ impl ResponseHandler { Err(_) => return, }, ); - info!("request headers after: {:?}", agent_headers); match client - .post(&url) + .post(&internal_router_url) .headers(agent_headers) .body(request_body) .send() @@ -416,7 +293,7 @@ impl ResponseHandler { } } Err(e) => { - if debug_mode && is_streaming { + if is_streaming { let error_reasoning = format!("āŒ Terminal agent error: {}\n", e); let error_chunk = Self::create_reasoning_chunk(&error_reasoning); let _ = tx.send(Ok(Bytes::from(error_chunk))).await; diff --git a/demos/use_cases/rag_agent/test.rest b/demos/use_cases/rag_agent/test.rest index 443e92b8..f3d68c54 100644 --- a/demos/use_cases/rag_agent/test.rest +++ b/demos/use_cases/rag_agent/test.rest @@ -1,5 +1,4 @@ @baseUrl = http://0.0.0.0:10502 -@model = gpt-4o # Health Check GET {{baseUrl}}/health @@ -11,7 +10,7 @@ POST {{baseUrl}}/v1/chat/completions Content-Type: application/json { - "model": "{{model}}", + "model": "gpt-4o", "messages": [ { "role": "user", @@ -27,7 +26,7 @@ POST {{baseUrl}}/v1/chat/completions Content-Type: application/json { - "model": "{{model}}", + "model": "gpt-4o", "messages": [ { "role": "user", @@ -42,7 +41,7 @@ POST http://localhost:8001/v1/chat/completions Content-Type: application/json { - "model": "{{model}}", + "model": "gpt-4o", "messages": [ { "role": "user", @@ -72,7 +71,7 @@ Content-Type: application/json x-debug-mode: true { - "model": "{{model}}", + "model": "gpt-4o", "messages": [ { "role": "user", @@ -88,7 +87,7 @@ Content-Type: application/json X-Debug-Mode: true { - "model": "{{model}}", + "model": "gpt-4o", "messages": [ { "role": "user", @@ -101,7 +100,6 @@ X-Debug-Mode: true ### Test debug mode without streaming (should work normally) POST http://localhost:8001/v1/chat/completions Content-Type: application/json -x-debug-mode: true { "model": "gpt-4o",