revert code changes

This commit is contained in:
Adil Hafeez 2025-10-22 15:56:09 -07:00
parent 8d80679822
commit fb6e7fba6e
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
2 changed files with 26 additions and 287 deletions

View file

@ -118,9 +118,6 @@ async fn handle_agent_chat(
AgentFilterChainError::RequestParsing(err)
})?;
// Check if streaming is enabled
let is_streaming = chat_completions_request.stream.unwrap_or(false);
// Extract trace parent for routing
let trace_parent = request_headers
.iter()
@ -141,16 +138,35 @@ async fn handle_agent_chat(
agent_selector.create_agent_map(agents)
};
return response_handler
.create_response_with_reasoning(
// Process the filter chain
let processed_messages = pipeline_processor
.process_filter_chain(
&chat_completions_request,
&selected_agent,
&agent_map,
&request_headers,
&pipeline_processor,
is_streaming,
)
.await
.map_err(AgentFilterChainError::from);
.await?;
// Get terminal agent and send final response
let terminal_agent_name = selected_agent.id;
let terminal_agent = agent_map.get(&terminal_agent_name).unwrap();
debug!("Processing terminal agent: {}", terminal_agent_name);
debug!("Terminal agent details: {:?}", terminal_agent);
let llm_response = pipeline_processor
.invoke_upstream_agent(
&processed_messages,
&chat_completions_request,
terminal_agent,
&request_headers,
)
.await?;
// Create streaming response
response_handler
.create_streaming_response(llm_response)
.await
.map_err(AgentFilterChainError::from)
}

View file

@ -3,16 +3,10 @@ use http_body_util::combinators::BoxBody;
use http_body_util::{BodyExt, Full, StreamBody};
use hyper::body::Frame;
use hyper::{Response, StatusCode};
use std::collections::HashMap;
use tokio::sync::mpsc;
use tokio_stream::wrappers::ReceiverStream;
use tokio_stream::StreamExt;
use tracing::{info, warn};
// Re-export necessary types for the reasoning implementation
use super::pipeline_processor::PipelineProcessor;
use common::configuration::{Agent, AgentFilterChain};
use hermesllm::apis::openai::ChatCompletionsRequest;
use tracing::warn;
/// Errors that can occur during response handling
#[derive(Debug, thiserror::Error)]
@ -119,277 +113,6 @@ impl ResponseHandler {
.body(stream_body)
.map_err(ResponseError::from)
}
/// Create a streaming response with real-time reasoning blocks for debug mode
pub async fn create_response_with_reasoning(
&self,
chat_request: &ChatCompletionsRequest,
selected_agent: &AgentFilterChain,
agent_map: &HashMap<String, Agent>,
request_headers: &hyper::HeaderMap,
_pipeline_processor: &PipelineProcessor,
is_streaming: bool,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ResponseError> {
// Create channel for streaming
let (tx, rx) = mpsc::channel::<Result<Bytes, hyper::Error>>(100);
// Clone necessary data for the async task
let chat_request = chat_request.clone();
let selected_agent = selected_agent.clone();
let agent_map = agent_map.clone();
let request_headers = request_headers.clone();
let internal_router_url = "http://localhost:11000/v1/chat/completions".to_string();
tokio::spawn(async move {
if is_streaming {
// Send initial reasoning block
let mut reasoning_content = String::new();
reasoning_content.push_str(&format!(
"Selected agent filter chain: {} {}\n",
selected_agent.id,
selected_agent.description.as_deref().unwrap_or("")
));
if !selected_agent.filter_chain.is_empty() {
reasoning_content.push_str(&format!(
"Processing {} filter agents in sequence:\n",
selected_agent.filter_chain.len()
));
} else {
reasoning_content.push_str(
"No filter agents configured, proceeding directly to final agent\n",
);
}
let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content);
let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await;
}
// Process filter chain step by step with real-time updates
let mut current_messages = chat_request.messages.clone();
if !selected_agent.filter_chain.is_empty() {
for (index, agent_name) in selected_agent.filter_chain.iter().enumerate() {
if is_streaming {
// Send reasoning update for starting this filter
let filter_start_reasoning = format!(
"Step {}: Calling filter agent '{}'\n",
index + 1,
agent_name
);
let filter_chunk = Self::create_reasoning_chunk(&filter_start_reasoning);
let _ = tx.send(Ok(Bytes::from(filter_chunk))).await;
}
if let Some(filter_agent) = agent_map.get(agent_name) {
let start_time = std::time::Instant::now();
// Process this individual filter
match Self::process_single_filter(
&current_messages,
&chat_request,
filter_agent,
&request_headers,
&internal_router_url,
)
.await
{
Ok(new_messages) => {
current_messages = new_messages;
if is_streaming {
let duration = start_time.elapsed();
// Send success reasoning
let success_reasoning = format!(
" Agent '{}' completed in {}ms\n",
agent_name,
duration.as_millis(),
);
let success_chunk =
Self::create_reasoning_chunk(&success_reasoning);
let _ = tx.send(Ok(Bytes::from(success_chunk))).await;
}
}
Err(e) => {
if is_streaming {
// Send error reasoning
let error_reasoning =
format!(" Filter '{}' failed: {}\n", agent_name, e);
let error_chunk =
Self::create_reasoning_chunk(&error_reasoning);
let _ = tx.send(Ok(Bytes::from(error_chunk))).await;
}
return;
}
}
} else {
if is_streaming {
// Send not found reasoning
let not_found_reasoning = format!(
" ⚠️ Filter agent '{}' not found in agent map\n",
agent_name
);
let not_found_chunk =
Self::create_reasoning_chunk(&not_found_reasoning);
let _ = tx.send(Ok(Bytes::from(not_found_chunk))).await;
}
}
}
}
if is_streaming {
// Send terminal agent reasoning
let terminal_reasoning = format!(
"\n Filter chain completed! Invoking final agent for final response...\n\n"
);
let terminal_chunk = Self::create_reasoning_chunk(&terminal_reasoning);
let _ = tx.send(Ok(Bytes::from(terminal_chunk))).await;
}
// Get terminal agent and invoke it
if let Some(terminal_agent) = agent_map.get(&selected_agent.id) {
// Create request for terminal agent
let mut terminal_request = chat_request;
terminal_request.messages = current_messages;
let request_body = match serde_json::to_string(&terminal_request) {
Ok(body) => body,
Err(_) => return,
};
let client = reqwest::Client::new();
let mut agent_headers = request_headers;
agent_headers.remove(hyper::header::CONTENT_LENGTH);
agent_headers.insert(
common::consts::ARCH_UPSTREAM_HOST_HEADER,
match hyper::header::HeaderValue::from_str(&terminal_agent.id) {
Ok(val) => val,
Err(_) => return,
},
);
match client
.post(&internal_router_url)
.headers(agent_headers)
.body(request_body)
.send()
.await
{
Ok(response) => {
// Stream the terminal agent response
let mut byte_stream = response.bytes_stream();
while let Some(item) = byte_stream.next().await {
match item {
Ok(chunk) => {
info!("Streaming terminal agent chunk len: {}", chunk.len());
if tx.send(Ok(chunk)).await.is_err() {
break;
}
}
Err(_) => break,
}
}
}
Err(e) => {
if is_streaming {
let error_reasoning = format!("❌ Terminal agent error: {}\n", e);
let error_chunk = Self::create_reasoning_chunk(&error_reasoning);
let _ = tx.send(Ok(Bytes::from(error_chunk))).await;
}
}
}
}
});
// Create streaming response
let stream =
ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?)));
let stream_body = BoxBody::new(StreamBody::new(stream));
Ok(Response::builder()
.status(200)
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.header("connection", "keep-alive")
.body(stream_body)?)
}
async fn process_single_filter(
messages: &[hermesllm::apis::openai::Message],
original_request: &ChatCompletionsRequest,
agent: &Agent,
request_headers: &hyper::HeaderMap,
url: &str,
) -> Result<Vec<hermesllm::apis::openai::Message>, Box<dyn std::error::Error + Send + Sync>>
{
let mut request = original_request.clone();
request.messages = messages.to_vec();
let request_body = serde_json::to_string(&request)?;
let client = reqwest::Client::new();
let mut agent_headers = request_headers.clone();
agent_headers.remove(hyper::header::CONTENT_LENGTH);
agent_headers.insert(
common::consts::ARCH_UPSTREAM_HOST_HEADER,
hyper::header::HeaderValue::from_str(&agent.id)?,
);
agent_headers.insert(
common::consts::ENVOY_RETRY_HEADER,
hyper::header::HeaderValue::from_str("3")?,
);
let response = client
.post(url)
.headers(agent_headers)
.body(request_body)
.send()
.await?;
let response_bytes = response.bytes().await?;
let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?;
let content = response_json
.get("choices")
.and_then(|choices| choices.as_array())
.and_then(|choices| choices.first())
.and_then(|choice| choice.get("message"))
.and_then(|message| message.get("content"))
.and_then(|content| content.as_str())
.ok_or("No content in response")?;
// Parse the response content as new message history
let new_messages: Vec<hermesllm::apis::openai::Message> = serde_json::from_str(content)?;
Ok(new_messages)
}
fn create_reasoning_chunk(reasoning_content: &str) -> String {
let reasoning_chunk = serde_json::json!({
"choices": [{
"delta": {
"reasoning": reasoning_content
},
"index": 0,
"finish_reason": null
}],
"created": std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
"id": format!("reasoning-{}", std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_nanos()),
"model": "agent-pipeline",
"object": "chat.completion.chunk"
});
format!("data: {}\n\n", reasoning_chunk)
}
}
impl Default for ResponseHandler {