rebase with main and better handle error from mcp

This commit is contained in:
Adil Hafeez 2025-12-16 00:09:24 -08:00
parent d83ffeedb3
commit 8bb64f6c62
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
2 changed files with 114 additions and 23 deletions

View file

@ -22,6 +22,12 @@ pub enum PipelineError {
NoChoicesInResponse(String),
#[error("No content in response from agent '{0}'")]
NoContentInResponse(String),
#[error("No result in response from agent '{0}'")]
NoResultInResponse(String),
#[error("No structured content in response from agent '{0}'")]
NoStructuredContentInResponse(String),
#[error("No messages in response from agent '{0}'")]
NoMessagesInResponse(String),
#[error("Client error from agent '{agent}' (HTTP {status}): {body}")]
ClientError {
agent: String,
@ -72,7 +78,6 @@ impl PipelineProcessor {
agent_map: &HashMap<String, Agent>,
request_headers: &HeaderMap,
) -> Result<Vec<Message>, PipelineError> {
let mut chat_history_updated = chat_history.to_vec();
for agent_name in &agent_filter_chain.filter_chain {
@ -84,17 +89,22 @@ impl PipelineProcessor {
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
info!("executing filter: {}/{}, url: {}, conversation length: {}", agent_name, tool_name, agent.url, chat_history.len());
info!(
"executing filter: {}/{}, url: {}, conversation length: {}",
agent_name,
tool_name,
agent.url,
chat_history.len()
);
chat_history_updated = self
.execute_filter(
&chat_history_updated,
agent,
request_headers,
)
.execute_filter(&chat_history_updated, agent, request_headers)
.await?;
info!("Received response: updated conversation length: {}", chat_history.len());
info!(
"Received response: updated conversation length: {}",
chat_history.len()
);
}
Ok(chat_history_updated)
@ -107,7 +117,6 @@ impl PipelineProcessor {
agent: &Agent,
request_headers: &HeaderMap,
) -> Result<Vec<Message>, PipelineError> {
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
session_id.clone()
} else {
@ -147,7 +156,10 @@ impl PipelineProcessor {
info!("Request body (pretty):\n{}", pretty_body);
let mut agent_headers = request_headers.clone();
info!("Using MCP session ID {} for agent {}", mcp_session_id, agent.id);
info!(
"Using MCP session ID {} for agent {}",
mcp_session_id, agent.id
);
// Log all headers being sent
info!("Headers being sent:");
@ -194,38 +206,106 @@ impl PipelineProcessor {
.send()
.await?;
let status = response.status();
let http_status = response.status();
let response_bytes = response.bytes().await?;
// Check for HTTP errors and handle them appropriately
if !status.is_success() {
if !http_status.is_success() {
let error_body = String::from_utf8_lossy(&response_bytes).to_string();
if status.is_client_error() {
if http_status.is_client_error() {
// 4xx errors - cascade back to developer
return Err(PipelineError::ClientError {
agent: agent.id.clone(),
status: status.as_u16(),
status: http_status.as_u16(),
body: error_body,
});
} else if status.is_server_error() {
} else if http_status.is_server_error() {
// 5xx errors - server/agent error
return Err(PipelineError::ServerError {
agent: agent.id.clone(),
status: status.as_u16(),
status: http_status.as_u16(),
body: error_body,
});
}
}
info!(
"response bytes in str: {}",
String::from_utf8_lossy(&response_bytes)
);
let response_str = String::from_utf8_lossy(&response_bytes);
let lines: Vec<&str> = response_str.lines().collect();
// Validate SSE format: first line should be "event: message"
if lines.is_empty() || lines[0] != "event: message" {
warn!("Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}", agent.id, lines.first());
return Err(PipelineError::NoContentInResponse(format!(
"Invalid SSE response format from agent {}: expected 'event: message' as first line",
agent.id
)));
}
// Find the data line
let data_lines: Vec<&str> = lines
.iter()
.filter(|line| line.starts_with("data: "))
.copied()
.collect();
if data_lines.len() != 1 {
warn!(
"Expected exactly one 'data:' line from agent {}, found {}",
agent.id,
data_lines.len()
);
return Err(PipelineError::NoContentInResponse(format!(
"Expected exactly one 'data:' line from agent {}, found {}",
agent.id,
data_lines.len()
)));
}
let data_chunk = &data_lines[0][6..]; // Skip "data: " prefix
let response: JsonRpcResponse = serde_json::from_str(data_chunk)?;
let response_result = response
.result
.ok_or_else(|| PipelineError::NoResultInResponse(agent.id.clone()))?;
// check if error field is set in response result
let mcp_error = response_result
.get("isError")
.and_then(|v| v.as_bool())
.unwrap_or(false);
if mcp_error {
let error_message = response_result
.get("content")
.and_then(|v| v.as_array())
.and_then(|arr| arr.get(0))
.and_then(|v| v.get("text"))
.and_then(|v| v.as_str())
.map(|s| s.to_string())
.unwrap_or("unknown_error".to_string());
return Err(PipelineError::ClientError {
agent: agent.id.clone(),
status: http_status.as_u16(),
body: error_message,
});
}
let response_json = response_result
.get("structuredContent")
.ok_or_else(|| PipelineError::NoStructuredContentInResponse(agent.id.clone()))?;
// Parse the response as JSON to extract the content
// let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?;
let messages: Vec<Message> = response_json
.get("result")
.and_then(|v| v.as_array())
.ok_or_else(|| PipelineError::NoContentInResponse(agent.id.clone()))?
.ok_or_else(|| PipelineError::NoMessagesInResponse(agent.id.clone()))?
.iter()
.map(|msg_value| serde_json::from_value(msg_value.clone()))
.collect::<Result<Vec<Message>, _>>()
@ -283,7 +363,10 @@ impl PipelineProcessor {
.expect("No mcp-session-id in response")
.to_string();
info!("Created new MCP session for agent {}: {}", agent_id, session_id);
info!(
"Created new MCP session for agent {}: {}",
agent_id, session_id
);
// Send initialized notification (without id field per JSON-RPC 2.0 spec)
let initialized_notification = JsonRpcNotification {
@ -308,7 +391,10 @@ impl PipelineProcessor {
.await
.expect("Failed to send initialized notification");
info!("Initialized notification response status: {}", notif_response.status());
info!(
"Initialized notification response status: {}",
notif_response.status()
);
session_id
}
@ -392,7 +478,12 @@ mod tests {
let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]);
let result = processor
.process_filter_chain(&initial_request.messages, &pipeline, &agent_map, &request_headers)
.process_filter_chain(
&initial_request.messages,
&pipeline,
&agent_map,
&request_headers,
)
.await;
assert!(result.is_err());

View file

@ -9,7 +9,7 @@ Accept: application/json, text/event-stream
POST http://localhost:10501/mcp
Content-Type: application/json
Accept: application/json, text/event-stream
mcp-session-id: e4ec1ae904e14e06b7d194da10e5f74c
mcp-session-id: 35d455dc07b8400887f86668590f12bb
{
"jsonrpc": "2.0",
@ -70,7 +70,7 @@ accept: application/json, text/event-stream
POST http://localhost:10501/mcp
content-type: application/json
mcp-session-id: 60be9fb816304cb6b9ecdb91d89cd91f
mcp-session-id: 35d455dc07b8400887f86668590f12bb
accept: application/json, text/event-stream
{