From 2fc4070ecb44397664b6aeb88cb5b85e0d7a6e9b Mon Sep 17 00:00:00 2001 From: Adil Hafeez Date: Wed, 22 Oct 2025 09:24:28 -0700 Subject: [PATCH] pending changes --- .../src/handlers/agent_chat_completions.rs | 71 ++-- .../src/handlers/response_handler.rs | 402 +++++++++++++++++- demos/use_cases/rag_agent/test.rest | 48 +++ 3 files changed, 495 insertions(+), 26 deletions(-) diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index a1a00f88..001c9caf 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -118,6 +118,16 @@ async fn handle_agent_chat( AgentFilterChainError::RequestParsing(err) })?; + // Check for debug/reasoning mode, by default its enabled + let debug_mode = request_headers + .get("x-debug-mode") + .and_then(|v| v.to_str().ok()) + .map(|v| v == "true") + .unwrap_or(true); + + // Check if streaming is enabled + let is_streaming = chat_completions_request.stream.unwrap_or(false); + // Extract trace parent for routing let trace_parent = request_headers .iter() @@ -138,35 +148,46 @@ async fn handle_agent_chat( agent_selector.create_agent_map(agents) }; - // Process the filter chain - let processed_messages = pipeline_processor - .process_filter_chain( + return response_handler + .create_streaming_response_with_realtime_reasoning( &chat_completions_request, &selected_agent, &agent_map, &request_headers, + &pipeline_processor, + is_streaming, + debug_mode, ) - .await?; - - // Get terminal agent and send final response - let terminal_agent_name = selected_agent.id; - let terminal_agent = agent_map.get(&terminal_agent_name).unwrap(); - - debug!("Processing terminal agent: {}", terminal_agent_name); - debug!("Terminal agent details: {:?}", terminal_agent); - - let llm_response = pipeline_processor - .invoke_upstream_agent( - &processed_messages, - &chat_completions_request, - terminal_agent, - &request_headers, - ) - .await?; - - // Create streaming response - response_handler - .create_streaming_response(llm_response) .await - .map_err(AgentFilterChainError::from) + .map_err(AgentFilterChainError::from); + + // // For normal mode, process pipeline and stream response + // let processed_messages = pipeline_processor + // .process_filter_chain( + // &chat_completions_request, + // &selected_agent, + // &agent_map, + // &request_headers, + // ) + // .await?; + + // // Get terminal agent and send final response + // let terminal_agent = agent_map.get(&selected_agent.id).unwrap(); + + // debug!("Processing terminal agent: {}", selected_agent.id); + // debug!("Terminal agent details: {:?}", terminal_agent); + + // let llm_response = pipeline_processor + // .invoke_upstream_agent( + // &processed_messages, + // &chat_completions_request, + // terminal_agent, + // &request_headers, + // ) + // .await?; + + // response_handler + // .create_streaming_response(llm_response) + // .await + // .map_err(AgentFilterChainError::from) } diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index 2d647d2c..9dde3c23 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -3,10 +3,16 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full, StreamBody}; use hyper::body::Frame; use hyper::{Response, StatusCode}; +use std::collections::HashMap; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::warn; +use tracing::{info, warn}; + +// Re-export necessary types for the reasoning implementation +use super::pipeline_processor::PipelineProcessor; +use common::configuration::{Agent, AgentFilterChain}; +use hermesllm::apis::openai::ChatCompletionsRequest; /// Errors that can occur during response handling #[derive(Debug, thiserror::Error)] @@ -113,6 +119,400 @@ impl ResponseHandler { .body(stream_body) .map_err(ResponseError::from) } + + /// Create a streaming response with reasoning blocks for debug mode + pub async fn create_streaming_response_with_reasoning( + &self, + chat_request: &ChatCompletionsRequest, + selected_agent: &AgentFilterChain, + processed_messages: &[hermesllm::apis::openai::Message], + llm_response: reqwest::Response, + ) -> Result>, ResponseError> { + // Create reasoning content + let mut reasoning_content = String::new(); + + reasoning_content.push_str(&format!( + "Starting agent processing pipeline for query: \"{}\"\n\n", + chat_request + .messages + .last() + .map(|m| match &m.content { + hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(), + _ => "complex content", + }) + .unwrap_or("unknown") + )); + + reasoning_content.push_str(&format!( + "Selected agent filter chain: {} {}\n", + selected_agent.id, + selected_agent.description.as_deref().unwrap_or("") + )); + + if !selected_agent.filter_chain.is_empty() { + reasoning_content.push_str(&format!( + "Processed {} filter agents in sequence:\n", + selected_agent.filter_chain.len() + )); + + for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() { + reasoning_content.push_str(&format!( + "{}. āœ“ Filter agent: {} completed\n", + index + 1, + filter_name + )); + } + } else { + reasoning_content + .push_str("No filter agents configured, proceeded directly to terminal agent\n"); + } + + reasoning_content.push_str(&format!( + "\nFilter chain processing completed successfully\n" + )); + reasoning_content.push_str(&format!( + "Final message count: {}\n", + processed_messages.len() + )); + reasoning_content.push_str("Now streaming response from terminal agent...\n\n"); + + // Create channel for streaming + let (tx, rx) = mpsc::channel::>(100); + + // Send reasoning block first + let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content); + let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await; + + // Clone response headers for streaming + let response_headers = llm_response.headers().clone(); + + tokio::spawn(async move { + // Stream the LLM response chunks + let mut byte_stream = llm_response.bytes_stream(); + while let Some(item) = byte_stream.next().await { + match item { + Ok(chunk) => { + if tx.send(Ok(chunk)).await.is_err() { + break; + } + } + Err(_) => break, + } + } + }); + + // Create streaming response with original headers + let mut response_builder = Response::builder(); + + // Copy relevant headers from the LLM response + if let Some(headers) = response_builder.headers_mut() { + for (header_name, header_value) in response_headers.iter() { + if header_name == "content-type" || header_name == "cache-control" { + headers.insert(header_name, header_value.clone()); + } + } + } + + let stream = + ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?))); + let stream_body = BoxBody::new(StreamBody::new(stream)); + + response_builder + .status(200) + .header("content-type", "text/event-stream") + .header("cache-control", "no-cache") + .header("connection", "keep-alive") + .body(stream_body) + .map_err(ResponseError::from) + } + + /// Create a streaming response with real-time reasoning blocks for debug mode + pub async fn create_streaming_response_with_realtime_reasoning( + &self, + chat_request: &ChatCompletionsRequest, + selected_agent: &AgentFilterChain, + agent_map: &HashMap, + request_headers: &hyper::HeaderMap, + _pipeline_processor: &PipelineProcessor, + is_streaming: bool, + debug_mode: bool, + ) -> Result>, ResponseError> { + // Create channel for streaming + let (tx, rx) = mpsc::channel::>(100); + + // Clone necessary data for the async task + let chat_request = chat_request.clone(); + let selected_agent = selected_agent.clone(); + let agent_map = agent_map.clone(); + let request_headers = request_headers.clone(); + let url = "http://localhost:11000/v1/chat/completions".to_string(); + + tokio::spawn(async move { + if debug_mode && is_streaming { + // Send initial reasoning block + let mut reasoning_content = String::new(); + reasoning_content.push_str(&format!( + "šŸš€ Starting agent processing pipeline for query: \"{}\"\n\n", + chat_request + .messages + .last() + .map(|m| match &m.content { + hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(), + _ => "complex content", + }) + .unwrap_or("unknown") + )); + + reasoning_content.push_str(&format!( + "šŸŽÆ Selected agent filter chain: {} {}\n", + selected_agent.id, + selected_agent.description.as_deref().unwrap_or("") + )); + + if !selected_agent.filter_chain.is_empty() { + reasoning_content.push_str(&format!( + "šŸ”„ Processing {} filter agents in sequence:\n", + selected_agent.filter_chain.len() + )); + } else { + reasoning_content.push_str( + "⚔ No filter agents configured, proceeding directly to terminal agent\n", + ); + } + + let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content); + let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await; + } + + // Process filter chain step by step with real-time updates + let mut current_messages = chat_request.messages.clone(); + + if !selected_agent.filter_chain.is_empty() { + for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() { + if debug_mode && is_streaming { + // Send reasoning update for starting this filter + let filter_start_reasoning = format!( + "šŸ”„ Step {}: Calling filter agent '{}'\n", + index + 1, + filter_name + ); + let filter_chunk = Self::create_reasoning_chunk(&filter_start_reasoning); + let _ = tx.send(Ok(Bytes::from(filter_chunk))).await; + } + + if let Some(filter_agent) = agent_map.get(filter_name) { + let start_time = std::time::Instant::now(); + + // Process this individual filter + match Self::process_single_filter( + ¤t_messages, + &chat_request, + filter_agent, + &request_headers, + &url, + ) + .await + { + Ok(new_messages) => { + current_messages = new_messages; + if debug_mode && is_streaming { + let duration = start_time.elapsed(); + + // Send success reasoning + let success_reasoning = format!( + " āœ… Filter '{}' completed in {}ms\n šŸ“Š Message count: {} → {}\n", + filter_name, + duration.as_millis(), + chat_request.messages.len(), + current_messages.len() + ); + let success_chunk = + Self::create_reasoning_chunk(&success_reasoning); + let _ = tx.send(Ok(Bytes::from(success_chunk))).await; + } + } + Err(e) => { + if debug_mode && is_streaming { + // Send error reasoning + let error_reasoning = + format!(" āŒ Filter '{}' failed: {}\n", filter_name, e); + let error_chunk = + Self::create_reasoning_chunk(&error_reasoning); + let _ = tx.send(Ok(Bytes::from(error_chunk))).await; + } + return; + } + } + } else { + if debug_mode && is_streaming { + // Send not found reasoning + let not_found_reasoning = format!( + " āš ļø Filter agent '{}' not found in agent map\n", + filter_name + ); + let not_found_chunk = + Self::create_reasoning_chunk(¬_found_reasoning); + let _ = tx.send(Ok(Bytes::from(not_found_chunk))).await; + } + } + } + } + + if debug_mode && is_streaming { + // Send terminal agent reasoning + let terminal_reasoning = format!( + "\nšŸŽÆ Filter chain completed! Invoking terminal agent for final response...\n\n" + ); + let terminal_chunk = Self::create_reasoning_chunk(&terminal_reasoning); + let _ = tx.send(Ok(Bytes::from(terminal_chunk))).await; + } + + // Get terminal agent and invoke it + if let Some(terminal_agent) = agent_map.get(&selected_agent.id) { + // Create request for terminal agent + let mut terminal_request = chat_request; + terminal_request.messages = current_messages; + + let request_body = match serde_json::to_string(&terminal_request) { + Ok(body) => body, + Err(_) => return, + }; + + info!("chat request string: {}", request_body); + + let client = reqwest::Client::new(); + let mut agent_headers = request_headers; + info!("request headers: {:?}", agent_headers); + agent_headers.remove(hyper::header::CONTENT_LENGTH); + agent_headers.insert( + common::consts::ARCH_UPSTREAM_HOST_HEADER, + match hyper::header::HeaderValue::from_str(&terminal_agent.id) { + Ok(val) => val, + Err(_) => return, + }, + ); + info!("request headers after: {:?}", agent_headers); + + match client + .post(&url) + .headers(agent_headers) + .body(request_body) + .send() + .await + { + Ok(response) => { + // Stream the terminal agent response + let mut byte_stream = response.bytes_stream(); + while let Some(item) = byte_stream.next().await { + match item { + Ok(chunk) => { + info!("Streaming terminal agent chunk len: {}", chunk.len()); + if tx.send(Ok(chunk)).await.is_err() { + break; + } + } + Err(_) => break, + } + } + } + Err(e) => { + if debug_mode && is_streaming { + let error_reasoning = format!("āŒ Terminal agent error: {}\n", e); + let error_chunk = Self::create_reasoning_chunk(&error_reasoning); + let _ = tx.send(Ok(Bytes::from(error_chunk))).await; + } + } + } + } + }); + + // Create streaming response + let stream = + ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?))); + let stream_body = BoxBody::new(StreamBody::new(stream)); + + Ok(Response::builder() + .status(200) + .header("content-type", "text/event-stream") + .header("cache-control", "no-cache") + .header("connection", "keep-alive") + .body(stream_body)?) + } + + async fn process_single_filter( + messages: &[hermesllm::apis::openai::Message], + original_request: &ChatCompletionsRequest, + agent: &Agent, + request_headers: &hyper::HeaderMap, + url: &str, + ) -> Result, Box> + { + let mut request = original_request.clone(); + request.messages = messages.to_vec(); + + let request_body = serde_json::to_string(&request)?; + let client = reqwest::Client::new(); + + let mut agent_headers = request_headers.clone(); + agent_headers.remove(hyper::header::CONTENT_LENGTH); + agent_headers.insert( + common::consts::ARCH_UPSTREAM_HOST_HEADER, + hyper::header::HeaderValue::from_str(&agent.id)?, + ); + + agent_headers.insert( + common::consts::ENVOY_RETRY_HEADER, + hyper::header::HeaderValue::from_str("3")?, + ); + + let response = client + .post(url) + .headers(agent_headers) + .body(request_body) + .send() + .await?; + + let response_bytes = response.bytes().await?; + let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?; + + let content = response_json + .get("choices") + .and_then(|choices| choices.as_array()) + .and_then(|choices| choices.first()) + .and_then(|choice| choice.get("message")) + .and_then(|message| message.get("content")) + .and_then(|content| content.as_str()) + .ok_or("No content in response")?; + + // Parse the response content as new message history + let new_messages: Vec = serde_json::from_str(content)?; + + Ok(new_messages) + } + + fn create_reasoning_chunk(reasoning_content: &str) -> String { + let reasoning_chunk = serde_json::json!({ + "choices": [{ + "delta": { + "reasoning": reasoning_content + }, + "index": 0, + "finish_reason": null + }], + "created": std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + "id": format!("reasoning-{}", std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap_or_default() + .as_nanos()), + "model": "agent-pipeline", + "object": "chat.completion.chunk" + }); + + format!("data: {}\n\n", reasoning_chunk) + } } impl Default for ResponseHandler { diff --git a/demos/use_cases/rag_agent/test.rest b/demos/use_cases/rag_agent/test.rest index b42673dd..443e92b8 100644 --- a/demos/use_cases/rag_agent/test.rest +++ b/demos/use_cases/rag_agent/test.rest @@ -65,3 +65,51 @@ Content-Type: application/json } ] } + +### Test with debug mode and reasoning blocks (streaming) +POST http://localhost:8001/v1/chat/completions +Content-Type: application/json +x-debug-mode: true + +{ + "model": "{{model}}", + "messages": [ + { + "role": "user", + "content": "What is the guaranteed uptime percentage for TechCorp's cloud services?" + } + ], + "stream": true +} + +### Test debug mode without streaming (should work normally) +POST {{baseUrl}}/v1/chat/completions +Content-Type: application/json +X-Debug-Mode: true + +{ + "model": "{{model}}", + "messages": [ + { + "role": "user", + "content": "What is the guaranteed uptime percentage for TechCorp's cloud services?" + } + ], + "stream": false +} + +### Test debug mode without streaming (should work normally) +POST http://localhost:8001/v1/chat/completions +Content-Type: application/json +x-debug-mode: true + +{ + "model": "gpt-4o", + "messages": [ + { + "role": "user", + "content": "What is the guaranteed uptime percentage for TechCorp's cloud services?" + } + ], + "stream": true +}