diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index 6884ccbd..a1a00f88 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -118,9 +118,6 @@ async fn handle_agent_chat( AgentFilterChainError::RequestParsing(err) })?; - // Check if streaming is enabled - let is_streaming = chat_completions_request.stream.unwrap_or(false); - // Extract trace parent for routing let trace_parent = request_headers .iter() @@ -141,16 +138,35 @@ async fn handle_agent_chat( agent_selector.create_agent_map(agents) }; - return response_handler - .create_response_with_reasoning( + // Process the filter chain + let processed_messages = pipeline_processor + .process_filter_chain( &chat_completions_request, &selected_agent, &agent_map, &request_headers, - &pipeline_processor, - is_streaming, ) - .await - .map_err(AgentFilterChainError::from); + .await?; + // Get terminal agent and send final response + let terminal_agent_name = selected_agent.id; + let terminal_agent = agent_map.get(&terminal_agent_name).unwrap(); + + debug!("Processing terminal agent: {}", terminal_agent_name); + debug!("Terminal agent details: {:?}", terminal_agent); + + let llm_response = pipeline_processor + .invoke_upstream_agent( + &processed_messages, + &chat_completions_request, + terminal_agent, + &request_headers, + ) + .await?; + + // Create streaming response + response_handler + .create_streaming_response(llm_response) + .await + .map_err(AgentFilterChainError::from) } diff --git a/crates/brightstaff/src/handlers/response_handler.rs b/crates/brightstaff/src/handlers/response_handler.rs index 96c7e4da..2d647d2c 100644 --- a/crates/brightstaff/src/handlers/response_handler.rs +++ b/crates/brightstaff/src/handlers/response_handler.rs @@ -3,16 +3,10 @@ use http_body_util::combinators::BoxBody; use http_body_util::{BodyExt, Full, StreamBody}; use hyper::body::Frame; use hyper::{Response, StatusCode}; -use std::collections::HashMap; use tokio::sync::mpsc; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; -use tracing::{info, warn}; - -// Re-export necessary types for the reasoning implementation -use super::pipeline_processor::PipelineProcessor; -use common::configuration::{Agent, AgentFilterChain}; -use hermesllm::apis::openai::ChatCompletionsRequest; +use tracing::warn; /// Errors that can occur during response handling #[derive(Debug, thiserror::Error)] @@ -119,277 +113,6 @@ impl ResponseHandler { .body(stream_body) .map_err(ResponseError::from) } - - - /// Create a streaming response with real-time reasoning blocks for debug mode - pub async fn create_response_with_reasoning( - &self, - chat_request: &ChatCompletionsRequest, - selected_agent: &AgentFilterChain, - agent_map: &HashMap, - request_headers: &hyper::HeaderMap, - _pipeline_processor: &PipelineProcessor, - is_streaming: bool, - ) -> Result>, ResponseError> { - // Create channel for streaming - let (tx, rx) = mpsc::channel::>(100); - - // Clone necessary data for the async task - let chat_request = chat_request.clone(); - let selected_agent = selected_agent.clone(); - let agent_map = agent_map.clone(); - let request_headers = request_headers.clone(); - let internal_router_url = "http://localhost:11000/v1/chat/completions".to_string(); - - tokio::spawn(async move { - if is_streaming { - // Send initial reasoning block - let mut reasoning_content = String::new(); - - reasoning_content.push_str(&format!( - "Selected agent filter chain: {} {}\n", - selected_agent.id, - selected_agent.description.as_deref().unwrap_or("") - )); - - if !selected_agent.filter_chain.is_empty() { - reasoning_content.push_str(&format!( - "Processing {} filter agents in sequence:\n", - selected_agent.filter_chain.len() - )); - } else { - reasoning_content.push_str( - "No filter agents configured, proceeding directly to final agent\n", - ); - } - - let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content); - let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await; - } - - // Process filter chain step by step with real-time updates - let mut current_messages = chat_request.messages.clone(); - - if !selected_agent.filter_chain.is_empty() { - for (index, agent_name) in selected_agent.filter_chain.iter().enumerate() { - if is_streaming { - // Send reasoning update for starting this filter - let filter_start_reasoning = format!( - "Step {}: Calling filter agent '{}'\n", - index + 1, - agent_name - ); - let filter_chunk = Self::create_reasoning_chunk(&filter_start_reasoning); - let _ = tx.send(Ok(Bytes::from(filter_chunk))).await; - } - - if let Some(filter_agent) = agent_map.get(agent_name) { - let start_time = std::time::Instant::now(); - - // Process this individual filter - match Self::process_single_filter( - ¤t_messages, - &chat_request, - filter_agent, - &request_headers, - &internal_router_url, - ) - .await - { - Ok(new_messages) => { - current_messages = new_messages; - if is_streaming { - let duration = start_time.elapsed(); - - // Send success reasoning - let success_reasoning = format!( - " Agent '{}' completed in {}ms\n", - agent_name, - duration.as_millis(), - ); - let success_chunk = - Self::create_reasoning_chunk(&success_reasoning); - let _ = tx.send(Ok(Bytes::from(success_chunk))).await; - } - } - Err(e) => { - if is_streaming { - // Send error reasoning - let error_reasoning = - format!(" Filter '{}' failed: {}\n", agent_name, e); - let error_chunk = - Self::create_reasoning_chunk(&error_reasoning); - let _ = tx.send(Ok(Bytes::from(error_chunk))).await; - } - return; - } - } - } else { - if is_streaming { - // Send not found reasoning - let not_found_reasoning = format!( - " ⚠️ Filter agent '{}' not found in agent map\n", - agent_name - ); - let not_found_chunk = - Self::create_reasoning_chunk(¬_found_reasoning); - let _ = tx.send(Ok(Bytes::from(not_found_chunk))).await; - } - } - } - } - - if is_streaming { - // Send terminal agent reasoning - let terminal_reasoning = format!( - "\n Filter chain completed! Invoking final agent for final response...\n\n" - ); - let terminal_chunk = Self::create_reasoning_chunk(&terminal_reasoning); - let _ = tx.send(Ok(Bytes::from(terminal_chunk))).await; - } - - // Get terminal agent and invoke it - if let Some(terminal_agent) = agent_map.get(&selected_agent.id) { - // Create request for terminal agent - let mut terminal_request = chat_request; - terminal_request.messages = current_messages; - - let request_body = match serde_json::to_string(&terminal_request) { - Ok(body) => body, - Err(_) => return, - }; - - let client = reqwest::Client::new(); - let mut agent_headers = request_headers; - agent_headers.remove(hyper::header::CONTENT_LENGTH); - agent_headers.insert( - common::consts::ARCH_UPSTREAM_HOST_HEADER, - match hyper::header::HeaderValue::from_str(&terminal_agent.id) { - Ok(val) => val, - Err(_) => return, - }, - ); - - match client - .post(&internal_router_url) - .headers(agent_headers) - .body(request_body) - .send() - .await - { - Ok(response) => { - // Stream the terminal agent response - let mut byte_stream = response.bytes_stream(); - while let Some(item) = byte_stream.next().await { - match item { - Ok(chunk) => { - info!("Streaming terminal agent chunk len: {}", chunk.len()); - if tx.send(Ok(chunk)).await.is_err() { - break; - } - } - Err(_) => break, - } - } - } - Err(e) => { - if is_streaming { - let error_reasoning = format!("❌ Terminal agent error: {}\n", e); - let error_chunk = Self::create_reasoning_chunk(&error_reasoning); - let _ = tx.send(Ok(Bytes::from(error_chunk))).await; - } - } - } - } - }); - - // Create streaming response - let stream = - ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?))); - let stream_body = BoxBody::new(StreamBody::new(stream)); - - Ok(Response::builder() - .status(200) - .header("content-type", "text/event-stream") - .header("cache-control", "no-cache") - .header("connection", "keep-alive") - .body(stream_body)?) - } - - async fn process_single_filter( - messages: &[hermesllm::apis::openai::Message], - original_request: &ChatCompletionsRequest, - agent: &Agent, - request_headers: &hyper::HeaderMap, - url: &str, - ) -> Result, Box> - { - let mut request = original_request.clone(); - request.messages = messages.to_vec(); - - let request_body = serde_json::to_string(&request)?; - let client = reqwest::Client::new(); - - let mut agent_headers = request_headers.clone(); - agent_headers.remove(hyper::header::CONTENT_LENGTH); - agent_headers.insert( - common::consts::ARCH_UPSTREAM_HOST_HEADER, - hyper::header::HeaderValue::from_str(&agent.id)?, - ); - - agent_headers.insert( - common::consts::ENVOY_RETRY_HEADER, - hyper::header::HeaderValue::from_str("3")?, - ); - - let response = client - .post(url) - .headers(agent_headers) - .body(request_body) - .send() - .await?; - - let response_bytes = response.bytes().await?; - let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?; - - let content = response_json - .get("choices") - .and_then(|choices| choices.as_array()) - .and_then(|choices| choices.first()) - .and_then(|choice| choice.get("message")) - .and_then(|message| message.get("content")) - .and_then(|content| content.as_str()) - .ok_or("No content in response")?; - - // Parse the response content as new message history - let new_messages: Vec = serde_json::from_str(content)?; - - Ok(new_messages) - } - - fn create_reasoning_chunk(reasoning_content: &str) -> String { - let reasoning_chunk = serde_json::json!({ - "choices": [{ - "delta": { - "reasoning": reasoning_content - }, - "index": 0, - "finish_reason": null - }], - "created": std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_secs(), - "id": format!("reasoning-{}", std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap_or_default() - .as_nanos()), - "model": "agent-pipeline", - "object": "chat.completion.chunk" - }); - - format!("data: {}\n\n", reasoning_chunk) - } } impl Default for ResponseHandler {