diff --git a/crates/brightstaff/src/handlers/pipeline_processor.rs b/crates/brightstaff/src/handlers/pipeline_processor.rs index 0bf38da1..25e284da 100644 --- a/crates/brightstaff/src/handlers/pipeline_processor.rs +++ b/crates/brightstaff/src/handlers/pipeline_processor.rs @@ -22,6 +22,12 @@ pub enum PipelineError { NoChoicesInResponse(String), #[error("No content in response from agent '{0}'")] NoContentInResponse(String), + #[error("No result in response from agent '{0}'")] + NoResultInResponse(String), + #[error("No structured content in response from agent '{0}'")] + NoStructuredContentInResponse(String), + #[error("No messages in response from agent '{0}'")] + NoMessagesInResponse(String), #[error("Client error from agent '{agent}' (HTTP {status}): {body}")] ClientError { agent: String, @@ -72,7 +78,6 @@ impl PipelineProcessor { agent_map: &HashMap, request_headers: &HeaderMap, ) -> Result, PipelineError> { - let mut chat_history_updated = chat_history.to_vec(); for agent_name in &agent_filter_chain.filter_chain { @@ -84,17 +89,22 @@ impl PipelineProcessor { let tool_name = agent.tool.as_deref().unwrap_or(&agent.id); - info!("executing filter: {}/{}, url: {}, conversation length: {}", agent_name, tool_name, agent.url, chat_history.len()); + info!( + "executing filter: {}/{}, url: {}, conversation length: {}", + agent_name, + tool_name, + agent.url, + chat_history.len() + ); chat_history_updated = self - .execute_filter( - &chat_history_updated, - agent, - request_headers, - ) + .execute_filter(&chat_history_updated, agent, request_headers) .await?; - info!("Received response: updated conversation length: {}", chat_history.len()); + info!( + "Received response: updated conversation length: {}", + chat_history.len() + ); } Ok(chat_history_updated) @@ -107,7 +117,6 @@ impl PipelineProcessor { agent: &Agent, request_headers: &HeaderMap, ) -> Result, PipelineError> { - let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) { session_id.clone() } else { @@ -147,7 +156,10 @@ impl PipelineProcessor { info!("Request body (pretty):\n{}", pretty_body); let mut agent_headers = request_headers.clone(); - info!("Using MCP session ID {} for agent {}", mcp_session_id, agent.id); + info!( + "Using MCP session ID {} for agent {}", + mcp_session_id, agent.id + ); // Log all headers being sent info!("Headers being sent:"); @@ -194,38 +206,106 @@ impl PipelineProcessor { .send() .await?; - let status = response.status(); + let http_status = response.status(); let response_bytes = response.bytes().await?; - // Check for HTTP errors and handle them appropriately - if !status.is_success() { + if !http_status.is_success() { let error_body = String::from_utf8_lossy(&response_bytes).to_string(); - if status.is_client_error() { + if http_status.is_client_error() { // 4xx errors - cascade back to developer return Err(PipelineError::ClientError { agent: agent.id.clone(), - status: status.as_u16(), + status: http_status.as_u16(), body: error_body, }); - } else if status.is_server_error() { + } else if http_status.is_server_error() { // 5xx errors - server/agent error return Err(PipelineError::ServerError { agent: agent.id.clone(), - status: status.as_u16(), + status: http_status.as_u16(), body: error_body, }); } } + info!( + "response bytes in str: {}", + String::from_utf8_lossy(&response_bytes) + ); + let response_str = String::from_utf8_lossy(&response_bytes); + let lines: Vec<&str> = response_str.lines().collect(); + + // Validate SSE format: first line should be "event: message" + if lines.is_empty() || lines[0] != "event: message" { + warn!("Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}", agent.id, lines.first()); + return Err(PipelineError::NoContentInResponse(format!( + "Invalid SSE response format from agent {}: expected 'event: message' as first line", + agent.id + ))); + } + + // Find the data line + let data_lines: Vec<&str> = lines + .iter() + .filter(|line| line.starts_with("data: ")) + .copied() + .collect(); + + if data_lines.len() != 1 { + warn!( + "Expected exactly one 'data:' line from agent {}, found {}", + agent.id, + data_lines.len() + ); + return Err(PipelineError::NoContentInResponse(format!( + "Expected exactly one 'data:' line from agent {}, found {}", + agent.id, + data_lines.len() + ))); + } + + let data_chunk = &data_lines[0][6..]; // Skip "data: " prefix + + let response: JsonRpcResponse = serde_json::from_str(data_chunk)?; + let response_result = response + .result + .ok_or_else(|| PipelineError::NoResultInResponse(agent.id.clone()))?; + + // check if error field is set in response result + let mcp_error = response_result + .get("isError") + .and_then(|v| v.as_bool()) + .unwrap_or(false); + + if mcp_error { + let error_message = response_result + .get("content") + .and_then(|v| v.as_array()) + .and_then(|arr| arr.get(0)) + .and_then(|v| v.get("text")) + .and_then(|v| v.as_str()) + .map(|s| s.to_string()) + .unwrap_or("unknown_error".to_string()); + + return Err(PipelineError::ClientError { + agent: agent.id.clone(), + status: http_status.as_u16(), + body: error_message, + }); + } + + let response_json = response_result + .get("structuredContent") + .ok_or_else(|| PipelineError::NoStructuredContentInResponse(agent.id.clone()))?; // Parse the response as JSON to extract the content // let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?; let messages: Vec = response_json .get("result") .and_then(|v| v.as_array()) - .ok_or_else(|| PipelineError::NoContentInResponse(agent.id.clone()))? + .ok_or_else(|| PipelineError::NoMessagesInResponse(agent.id.clone()))? .iter() .map(|msg_value| serde_json::from_value(msg_value.clone())) .collect::, _>>() @@ -283,7 +363,10 @@ impl PipelineProcessor { .expect("No mcp-session-id in response") .to_string(); - info!("Created new MCP session for agent {}: {}", agent_id, session_id); + info!( + "Created new MCP session for agent {}: {}", + agent_id, session_id + ); // Send initialized notification (without id field per JSON-RPC 2.0 spec) let initialized_notification = JsonRpcNotification { @@ -308,7 +391,10 @@ impl PipelineProcessor { .await .expect("Failed to send initialized notification"); - info!("Initialized notification response status: {}", notif_response.status()); + info!( + "Initialized notification response status: {}", + notif_response.status() + ); session_id } @@ -392,7 +478,12 @@ mod tests { let pipeline = create_test_pipeline(vec!["nonexistent-agent", "terminal-agent"]); let result = processor - .process_filter_chain(&initial_request.messages, &pipeline, &agent_map, &request_headers) + .process_filter_chain( + &initial_request.messages, + &pipeline, + &agent_map, + &request_headers, + ) .await; assert!(result.is_err()); diff --git a/demos/use_cases/rag_agent/mcp_query.rest b/demos/use_cases/rag_agent/mcp_query.rest index f4a5ae2c..c08dd884 100644 --- a/demos/use_cases/rag_agent/mcp_query.rest +++ b/demos/use_cases/rag_agent/mcp_query.rest @@ -9,7 +9,7 @@ Accept: application/json, text/event-stream POST http://localhost:10501/mcp Content-Type: application/json Accept: application/json, text/event-stream -mcp-session-id: e4ec1ae904e14e06b7d194da10e5f74c +mcp-session-id: 35d455dc07b8400887f86668590f12bb { "jsonrpc": "2.0", @@ -70,7 +70,7 @@ accept: application/json, text/event-stream POST http://localhost:10501/mcp content-type: application/json -mcp-session-id: 60be9fb816304cb6b9ecdb91d89cd91f +mcp-session-id: 35d455dc07b8400887f86668590f12bb accept: application/json, text/event-stream {