pending changes

This commit is contained in:
Adil Hafeez 2025-10-22 15:55:13 -07:00
parent c3c161ca4c
commit 8d80679822
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
3 changed files with 29 additions and 191 deletions

View file

@ -118,13 +118,6 @@ async fn handle_agent_chat(
AgentFilterChainError::RequestParsing(err)
})?;
// Check for debug/reasoning mode, by default its enabled
let debug_mode = request_headers
.get("x-debug-mode")
.and_then(|v| v.to_str().ok())
.map(|v| v == "true")
.unwrap_or(true);
// Check if streaming is enabled
let is_streaming = chat_completions_request.stream.unwrap_or(false);
@ -149,45 +142,15 @@ async fn handle_agent_chat(
};
return response_handler
.create_streaming_response_with_realtime_reasoning(
.create_response_with_reasoning(
&chat_completions_request,
&selected_agent,
&agent_map,
&request_headers,
&pipeline_processor,
is_streaming,
debug_mode,
)
.await
.map_err(AgentFilterChainError::from);
// // For normal mode, process pipeline and stream response
// let processed_messages = pipeline_processor
// .process_filter_chain(
// &chat_completions_request,
// &selected_agent,
// &agent_map,
// &request_headers,
// )
// .await?;
// // Get terminal agent and send final response
// let terminal_agent = agent_map.get(&selected_agent.id).unwrap();
// debug!("Processing terminal agent: {}", selected_agent.id);
// debug!("Terminal agent details: {:?}", terminal_agent);
// let llm_response = pipeline_processor
// .invoke_upstream_agent(
// &processed_messages,
// &chat_completions_request,
// terminal_agent,
// &request_headers,
// )
// .await?;
// response_handler
// .create_streaming_response(llm_response)
// .await
// .map_err(AgentFilterChainError::from)
}

View file

@ -120,114 +120,9 @@ impl ResponseHandler {
.map_err(ResponseError::from)
}
/// Create a streaming response with reasoning blocks for debug mode
pub async fn create_streaming_response_with_reasoning(
&self,
chat_request: &ChatCompletionsRequest,
selected_agent: &AgentFilterChain,
processed_messages: &[hermesllm::apis::openai::Message],
llm_response: reqwest::Response,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ResponseError> {
// Create reasoning content
let mut reasoning_content = String::new();
reasoning_content.push_str(&format!(
"Starting agent processing pipeline for query: \"{}\"\n\n",
chat_request
.messages
.last()
.map(|m| match &m.content {
hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(),
_ => "complex content",
})
.unwrap_or("unknown")
));
reasoning_content.push_str(&format!(
"Selected agent filter chain: {} {}\n",
selected_agent.id,
selected_agent.description.as_deref().unwrap_or("")
));
if !selected_agent.filter_chain.is_empty() {
reasoning_content.push_str(&format!(
"Processed {} filter agents in sequence:\n",
selected_agent.filter_chain.len()
));
for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() {
reasoning_content.push_str(&format!(
"{}. ✓ Filter agent: {} completed\n",
index + 1,
filter_name
));
}
} else {
reasoning_content
.push_str("No filter agents configured, proceeded directly to terminal agent\n");
}
reasoning_content.push_str(&format!(
"\nFilter chain processing completed successfully\n"
));
reasoning_content.push_str(&format!(
"Final message count: {}\n",
processed_messages.len()
));
reasoning_content.push_str("Now streaming response from terminal agent...\n\n");
// Create channel for streaming
let (tx, rx) = mpsc::channel::<Result<Bytes, hyper::Error>>(100);
// Send reasoning block first
let reasoning_chunk = Self::create_reasoning_chunk(&reasoning_content);
let _ = tx.send(Ok(Bytes::from(reasoning_chunk))).await;
// Clone response headers for streaming
let response_headers = llm_response.headers().clone();
tokio::spawn(async move {
// Stream the LLM response chunks
let mut byte_stream = llm_response.bytes_stream();
while let Some(item) = byte_stream.next().await {
match item {
Ok(chunk) => {
if tx.send(Ok(chunk)).await.is_err() {
break;
}
}
Err(_) => break,
}
}
});
// Create streaming response with original headers
let mut response_builder = Response::builder();
// Copy relevant headers from the LLM response
if let Some(headers) = response_builder.headers_mut() {
for (header_name, header_value) in response_headers.iter() {
if header_name == "content-type" || header_name == "cache-control" {
headers.insert(header_name, header_value.clone());
}
}
}
let stream =
ReceiverStream::new(rx).map(|chunk| Ok::<_, hyper::Error>(Frame::data(chunk?)));
let stream_body = BoxBody::new(StreamBody::new(stream));
response_builder
.status(200)
.header("content-type", "text/event-stream")
.header("cache-control", "no-cache")
.header("connection", "keep-alive")
.body(stream_body)
.map_err(ResponseError::from)
}
/// Create a streaming response with real-time reasoning blocks for debug mode
pub async fn create_streaming_response_with_realtime_reasoning(
pub async fn create_response_with_reasoning(
&self,
chat_request: &ChatCompletionsRequest,
selected_agent: &AgentFilterChain,
@ -235,7 +130,6 @@ impl ResponseHandler {
request_headers: &hyper::HeaderMap,
_pipeline_processor: &PipelineProcessor,
is_streaming: bool,
debug_mode: bool,
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, ResponseError> {
// Create channel for streaming
let (tx, rx) = mpsc::channel::<Result<Bytes, hyper::Error>>(100);
@ -245,38 +139,27 @@ impl ResponseHandler {
let selected_agent = selected_agent.clone();
let agent_map = agent_map.clone();
let request_headers = request_headers.clone();
let url = "http://localhost:11000/v1/chat/completions".to_string();
let internal_router_url = "http://localhost:11000/v1/chat/completions".to_string();
tokio::spawn(async move {
if debug_mode && is_streaming {
if is_streaming {
// Send initial reasoning block
let mut reasoning_content = String::new();
reasoning_content.push_str(&format!(
"🚀 Starting agent processing pipeline for query: \"{}\"\n\n",
chat_request
.messages
.last()
.map(|m| match &m.content {
hermesllm::apis::openai::MessageContent::Text(text) => text.as_str(),
_ => "complex content",
})
.unwrap_or("unknown")
));
reasoning_content.push_str(&format!(
"🎯 Selected agent filter chain: {} {}\n",
"Selected agent filter chain: {} {}\n",
selected_agent.id,
selected_agent.description.as_deref().unwrap_or("")
));
if !selected_agent.filter_chain.is_empty() {
reasoning_content.push_str(&format!(
"🔄 Processing {} filter agents in sequence:\n",
"Processing {} filter agents in sequence:\n",
selected_agent.filter_chain.len()
));
} else {
reasoning_content.push_str(
"No filter agents configured, proceeding directly to terminal agent\n",
"No filter agents configured, proceeding directly to final agent\n",
);
}
@ -288,19 +171,19 @@ impl ResponseHandler {
let mut current_messages = chat_request.messages.clone();
if !selected_agent.filter_chain.is_empty() {
for (index, filter_name) in selected_agent.filter_chain.iter().enumerate() {
if debug_mode && is_streaming {
for (index, agent_name) in selected_agent.filter_chain.iter().enumerate() {
if is_streaming {
// Send reasoning update for starting this filter
let filter_start_reasoning = format!(
"🔄 Step {}: Calling filter agent '{}'\n",
"Step {}: Calling filter agent '{}'\n",
index + 1,
filter_name
agent_name
);
let filter_chunk = Self::create_reasoning_chunk(&filter_start_reasoning);
let _ = tx.send(Ok(Bytes::from(filter_chunk))).await;
}
if let Some(filter_agent) = agent_map.get(filter_name) {
if let Some(filter_agent) = agent_map.get(agent_name) {
let start_time = std::time::Instant::now();
// Process this individual filter
@ -309,22 +192,20 @@ impl ResponseHandler {
&chat_request,
filter_agent,
&request_headers,
&url,
&internal_router_url,
)
.await
{
Ok(new_messages) => {
current_messages = new_messages;
if debug_mode && is_streaming {
if is_streaming {
let duration = start_time.elapsed();
// Send success reasoning
let success_reasoning = format!(
" ✅ Filter '{}' completed in {}ms\n 📊 Message count: {} → {}\n",
filter_name,
" Agent '{}' completed in {}ms\n",
agent_name,
duration.as_millis(),
chat_request.messages.len(),
current_messages.len()
);
let success_chunk =
Self::create_reasoning_chunk(&success_reasoning);
@ -332,10 +213,10 @@ impl ResponseHandler {
}
}
Err(e) => {
if debug_mode && is_streaming {
if is_streaming {
// Send error reasoning
let error_reasoning =
format!(" Filter '{}' failed: {}\n", filter_name, e);
format!(" Filter '{}' failed: {}\n", agent_name, e);
let error_chunk =
Self::create_reasoning_chunk(&error_reasoning);
let _ = tx.send(Ok(Bytes::from(error_chunk))).await;
@ -344,11 +225,11 @@ impl ResponseHandler {
}
}
} else {
if debug_mode && is_streaming {
if is_streaming {
// Send not found reasoning
let not_found_reasoning = format!(
" ⚠️ Filter agent '{}' not found in agent map\n",
filter_name
agent_name
);
let not_found_chunk =
Self::create_reasoning_chunk(&not_found_reasoning);
@ -358,10 +239,10 @@ impl ResponseHandler {
}
}
if debug_mode && is_streaming {
if is_streaming {
// Send terminal agent reasoning
let terminal_reasoning = format!(
"\n🎯 Filter chain completed! Invoking terminal agent for final response...\n\n"
"\n Filter chain completed! Invoking final agent for final response...\n\n"
);
let terminal_chunk = Self::create_reasoning_chunk(&terminal_reasoning);
let _ = tx.send(Ok(Bytes::from(terminal_chunk))).await;
@ -378,11 +259,8 @@ impl ResponseHandler {
Err(_) => return,
};
info!("chat request string: {}", request_body);
let client = reqwest::Client::new();
let mut agent_headers = request_headers;
info!("request headers: {:?}", agent_headers);
agent_headers.remove(hyper::header::CONTENT_LENGTH);
agent_headers.insert(
common::consts::ARCH_UPSTREAM_HOST_HEADER,
@ -391,10 +269,9 @@ impl ResponseHandler {
Err(_) => return,
},
);
info!("request headers after: {:?}", agent_headers);
match client
.post(&url)
.post(&internal_router_url)
.headers(agent_headers)
.body(request_body)
.send()
@ -416,7 +293,7 @@ impl ResponseHandler {
}
}
Err(e) => {
if debug_mode && is_streaming {
if is_streaming {
let error_reasoning = format!("❌ Terminal agent error: {}\n", e);
let error_chunk = Self::create_reasoning_chunk(&error_reasoning);
let _ = tx.send(Ok(Bytes::from(error_chunk))).await;

View file

@ -1,5 +1,4 @@
@baseUrl = http://0.0.0.0:10502
@model = gpt-4o
# Health Check
GET {{baseUrl}}/health
@ -11,7 +10,7 @@ POST {{baseUrl}}/v1/chat/completions
Content-Type: application/json
{
"model": "{{model}}",
"model": "gpt-4o",
"messages": [
{
"role": "user",
@ -27,7 +26,7 @@ POST {{baseUrl}}/v1/chat/completions
Content-Type: application/json
{
"model": "{{model}}",
"model": "gpt-4o",
"messages": [
{
"role": "user",
@ -42,7 +41,7 @@ POST http://localhost:8001/v1/chat/completions
Content-Type: application/json
{
"model": "{{model}}",
"model": "gpt-4o",
"messages": [
{
"role": "user",
@ -72,7 +71,7 @@ Content-Type: application/json
x-debug-mode: true
{
"model": "{{model}}",
"model": "gpt-4o",
"messages": [
{
"role": "user",
@ -88,7 +87,7 @@ Content-Type: application/json
X-Debug-Mode: true
{
"model": "{{model}}",
"model": "gpt-4o",
"messages": [
{
"role": "user",
@ -101,7 +100,6 @@ X-Debug-Mode: true
### Test debug mode without streaming (should work normally)
POST http://localhost:8001/v1/chat/completions
Content-Type: application/json
x-debug-mode: true
{
"model": "gpt-4o",