pending changes

This commit is contained in:
Adil Hafeez 2025-12-15 18:17:15 -08:00
parent afffa11e91
commit 358fa856c4
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
21 changed files with 1195 additions and 403 deletions

View file

@ -81,7 +81,7 @@ async fn handle_agent_chat(
) -> Result<Response<BoxBody<Bytes, hyper::Error>>, AgentFilterChainError> {
// Initialize services
let agent_selector = AgentSelector::new(router_service);
let pipeline_processor = PipelineProcessor::default();
let mut pipeline_processor = PipelineProcessor::default();
let response_handler = ResponseHandler::new();
// Extract listener name from headers
@ -144,9 +144,9 @@ async fn handle_agent_chat(
debug!("Processing agent pipeline: {}", selected_agent.id);
// Process the filter chain
let processed_messages = pipeline_processor
let chat_history = pipeline_processor
.process_filter_chain(
&chat_completions_request,
&chat_completions_request.messages,
&selected_agent,
&agent_map,
&request_headers,
@ -161,8 +161,8 @@ async fn handle_agent_chat(
debug!("Terminal agent details: {:?}", terminal_agent);
let llm_response = pipeline_processor
.invoke_upstream_agent(
&processed_messages,
.invoke_terminal_agent(
&chat_history,
&chat_completions_request,
terminal_agent,
&request_headers,

View file

@ -8,7 +8,6 @@ use hermesllm::apis::openai::Message;
use tracing::{debug, warn};
use crate::router::llm_router::RouterService;
use crate::utils::mcp_client::McpClient;
/// Errors that can occur during agent selection
#[derive(Debug, thiserror::Error)]
@ -28,14 +27,12 @@ pub enum AgentSelectionError {
/// Service for selecting agents based on routing preferences and listener configuration
pub struct AgentSelector {
router_service: Arc<RouterService>,
mcp_client: McpClient,
}
impl AgentSelector {
pub fn new(router_service: Arc<RouterService>) -> Self {
Self {
router_service,
mcp_client: McpClient::new(),
}
}
@ -152,7 +149,7 @@ impl AgentSelector {
for agent_chain in agents {
// Get the actual agent from the agent_map
let agent = agent_map.get(&agent_chain.id);
// Determine the description to use
let description = if let Some(agent) = agent {
// Check if this is an MCP agent (URL starts with mcp://)
@ -161,36 +158,10 @@ impl AgentSelector {
"Agent {} is an MCP agent, fetching tool description from: {}",
agent.id, agent.url
);
// Fetch description from MCP endpoint
match self
.mcp_client
.fetch_tool_description(&agent.url, agent.tool.as_deref())
.await
{
Ok(mcp_description) => {
if !mcp_description.is_empty() {
debug!(
"Fetched MCP description for agent {}: {}",
agent.id, mcp_description
);
mcp_description
} else {
warn!(
"MCP tool description is empty for agent {}, using config description",
agent.id
);
agent_chain.description.clone().unwrap_or_default()
}
}
Err(e) => {
warn!(
"Failed to fetch MCP description for agent {}: {}, using config description",
agent.id, e
);
agent_chain.description.clone().unwrap_or_default()
}
}
//TODO: fetch description from mcp server
"MCP tool description placeholder from config".to_string()
} else {
// Not an MCP agent, use description from config
agent_chain.description.clone().unwrap_or_default()

View file

@ -0,0 +1,44 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum JsonRpcId {
String(String),
Number(u64),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcRequest {
pub jsonrpc: String,
pub id: JsonRpcId,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<HashMap<String, serde_json::Value>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcNotification {
pub jsonrpc: String,
pub method: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub params: Option<HashMap<String, serde_json::Value>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub data: Option<serde_json::Value>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: JsonRpcId,
#[serde(skip_serializing_if = "Option::is_none")]
pub result: Option<HashMap<String, serde_json::Value>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub error: Option<JsonRpcError>,
}

View file

@ -6,6 +6,7 @@ pub mod function_calling;
pub mod pipeline_processor;
pub mod response_handler;
pub mod utils;
pub mod jsonrpc;
#[cfg(test)]
mod integration_tests;

View file

@ -4,7 +4,10 @@ use common::configuration::{Agent, AgentFilterChain};
use common::consts::{ARCH_UPSTREAM_HOST_HEADER, ENVOY_RETRY_HEADER};
use hermesllm::apis::openai::{ChatCompletionsRequest, Message};
use hyper::header::HeaderMap;
use tracing::{debug, warn};
use tracing::{debug, info, warn};
use crate::handlers::jsonrpc::{JsonRpcId, JsonRpcNotification, JsonRpcRequest, JsonRpcResponse};
use uuid::Uuid;
/// Errors that can occur during pipeline processing
#[derive(Debug, thiserror::Error)]
@ -25,13 +28,17 @@ pub enum PipelineError {
pub struct PipelineProcessor {
client: reqwest::Client,
url: String,
agent_id_session_map: HashMap<String, String>,
}
const ENVOY_API_ROUTER_ADDRESS: &str = "http://localhost:11000";
impl Default for PipelineProcessor {
fn default() -> Self {
Self {
client: reqwest::Client::new(),
url: "http://localhost:11000/v1/chat/completions".to_string(),
url: ENVOY_API_ROUTER_ADDRESS.to_string(),
agent_id_session_map: HashMap::new(),
}
}
}
@ -41,18 +48,20 @@ impl PipelineProcessor {
Self {
client: reqwest::Client::new(),
url,
agent_id_session_map: HashMap::new(),
}
}
/// Process the filter chain of agents (all except the terminal agent)
pub async fn process_filter_chain(
&self,
initial_request: &ChatCompletionsRequest,
&mut self,
chat_history: &[Message],
agent_filter_chain: &AgentFilterChain,
agent_map: &HashMap<String, Agent>,
request_headers: &HeaderMap,
) -> Result<Vec<Message>, PipelineError> {
let mut chat_completions_history = initial_request.messages.clone();
let mut chat_history_updated = chat_history.to_vec();
for agent_name in &agent_filter_chain.filter_chain {
debug!("Processing filter agent: {}", agent_name);
@ -61,47 +70,83 @@ impl PipelineProcessor {
.get(agent_name)
.ok_or_else(|| PipelineError::AgentNotFound(agent_name.clone()))?;
debug!("Agent details: {:?}", agent);
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
let response_content = self
.send_agent_filter_chain_request(
&chat_completions_history,
initial_request,
info!("executing filter: {}/{}, url: {}, conversation length: {}", agent_name, tool_name, agent.url, chat_history.len());
chat_history_updated = self
.execute_filter(
&chat_history_updated,
agent,
request_headers,
)
.await?;
debug!("Received response from filter agent {}", agent_name);
// Parse the response content as new message history
chat_completions_history =
serde_json::from_str(&response_content).inspect_err(|err| {
warn!(
"Failed to parse response from agent {}, err: {}, response: {}",
agent_name, err, response_content
)
})?;
info!("Received response: updated conversation length: {}", chat_history.len());
}
Ok(chat_completions_history)
Ok(chat_history_updated)
}
/// Send request to a specific agent and return the response content
async fn send_agent_filter_chain_request(
&self,
async fn execute_filter(
&mut self,
messages: &[Message],
original_request: &ChatCompletionsRequest,
agent: &Agent,
request_headers: &HeaderMap,
) -> Result<String, PipelineError> {
let mut request = original_request.clone();
request.messages = messages.to_vec();
) -> Result<Vec<Message>, PipelineError> {
let request_body = serde_json::to_string(&request)?;
debug!("Sending request to agent {}", agent.id);
let mcp_session_id = if let Some(session_id) = self.agent_id_session_map.get(&agent.id) {
session_id.clone()
} else {
let session_id = self.get_new_session_id(&agent.id).await;
self.agent_id_session_map
.insert(agent.id.clone(), session_id.clone());
session_id
};
// let mut request = original_request.clone();
// request.messages = messages.to_vec();
let tool_name = agent.tool.as_deref().unwrap_or(&agent.id);
let arguments = serde_json::json!({
"messages": messages
});
let params = serde_json::json!({
"name": tool_name,
"arguments": arguments
});
let json_rpc_request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: JsonRpcId::String(Uuid::new_v4().to_string()),
method: "tools/call".to_string(),
params: Some(serde_json::from_value(params)?),
};
let request_body = serde_json::to_string(&json_rpc_request)?;
info!("Sending request to agent {}", agent.id);
info!("Request body: {}", request_body);
// Pretty print for debugging
let pretty_body = serde_json::to_string_pretty(&json_rpc_request)?;
info!("Request body (pretty):\n{}", pretty_body);
let mut agent_headers = request_headers.clone();
info!("Using MCP session ID {} for agent {}", mcp_session_id, agent.id);
// Log all headers being sent
info!("Headers being sent:");
for (key, value) in agent_headers.iter() {
info!(" {}: {:?}", key, value);
}
agent_headers.insert(
"mcp-session-id",
hyper::header::HeaderValue::from_str(&mcp_session_id).unwrap(),
);
agent_headers.remove(hyper::header::CONTENT_LENGTH);
agent_headers.insert(
ARCH_UPSTREAM_HOST_HEADER,
@ -114,9 +159,24 @@ impl PipelineProcessor {
hyper::header::HeaderValue::from_str("3").unwrap(),
);
agent_headers.insert(
"Accept",
hyper::header::HeaderValue::from_static("application/json, text/event-stream"),
);
agent_headers.insert(
"Content-Type",
hyper::header::HeaderValue::from_static("application/json"),
);
info!("Final headers being sent:");
for (key, value) in agent_headers.iter() {
info!(" {}: {:?}", key, value);
}
let response = self
.client
.post(&self.url)
.post(format!("{}/mcp", self.url))
.headers(agent_headers)
.body(request_body)
.send()
@ -124,24 +184,149 @@ impl PipelineProcessor {
let response_bytes = response.bytes().await?;
// Parse the response as JSON to extract the content
let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?;
info!(
"response bytes in str: {}",
String::from_utf8_lossy(&response_bytes)
);
let content = response_json
.get("choices")
.and_then(|choices| choices.as_array())
.and_then(|choices| choices.first())
.and_then(|choice| choice.get("message"))
.and_then(|message| message.get("content"))
.and_then(|content| content.as_str())
let response_str = String::from_utf8_lossy(&response_bytes);
let lines: Vec<&str> = response_str.lines().collect();
// Validate SSE format: first line should be "event: message"
if lines.is_empty() || lines[0] != "event: message" {
warn!("Invalid SSE response format from agent {}: expected 'event: message' as first line, got: {:?}", agent.id, lines.first());
return Err(PipelineError::NoContentInResponse(format!(
"Invalid SSE response format from agent {}: expected 'event: message' as first line",
agent.id
)));
}
// Find the data line
let data_lines: Vec<&str> = lines
.iter()
.filter(|line| line.starts_with("data: "))
.copied()
.collect();
if data_lines.len() != 1 {
warn!(
"Expected exactly one 'data:' line from agent {}, found {}",
agent.id,
data_lines.len()
);
return Err(PipelineError::NoContentInResponse(format!(
"Expected exactly one 'data:' line from agent {}, found {}",
agent.id,
data_lines.len()
)));
}
let data_chunk = &data_lines[0][6..]; // Skip "data: " prefix
let response: JsonRpcResponse = serde_json::from_str(data_chunk)?;
let response_result = response
.result
.ok_or_else(|| PipelineError::NoChoicesInResponse(agent.id.clone()))?;
let response_json = response_result
.get("structuredContent")
.ok_or_else(|| PipelineError::NoChoicesInResponse(agent.id.clone()))?;
// Parse the response as JSON to extract the content
// let response_json: serde_json::Value = serde_json::from_slice(&response_bytes)?;
let messages: Vec<Message> = response_json
.get("result")
.and_then(|v| v.as_array())
.ok_or_else(|| PipelineError::NoContentInResponse(agent.id.clone()))?
.iter()
.map(|msg_value| serde_json::from_value(msg_value.clone()))
.collect::<Result<Vec<Message>, _>>()
.map_err(PipelineError::ParseError)?;
Ok(messages)
}
async fn get_new_session_id(&self, agent_id: &str) -> String {
let initialize_request = JsonRpcRequest {
jsonrpc: "2.0".to_string(),
id: JsonRpcId::Number(1),
method: "initialize".to_string(),
params: Some({
let mut params = HashMap::new();
params.insert(
"protocolVersion".to_string(),
serde_json::Value::String("2024-11-05".to_string()),
);
params.insert("capabilities".to_string(), serde_json::json!({}));
params.insert(
"clientInfo".to_string(),
serde_json::json!({
"name": "brightstaff",
"version": "1.0.0"
}),
);
params
}),
};
let request_body = serde_json::to_string(&initialize_request).unwrap();
info!("Initializing MCP session for agent {}", agent_id);
info!("Initialize request body: {}", request_body);
let response = self
.client
.post(format!("{}/mcp", self.url))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header(ARCH_UPSTREAM_HOST_HEADER, agent_id)
.body(request_body)
.send()
.await
.expect("Failed to initialize MCP session");
info!("Initialize response status: {}", response.status());
info!("Initialize response headers: {:?}", response.headers());
let session_id = response
.headers()
.get("mcp-session-id")
.and_then(|v| v.to_str().ok())
.expect("No mcp-session-id in response")
.to_string();
Ok(content)
info!("Created new MCP session for agent {}: {}", agent_id, session_id);
// Send initialized notification (without id field per JSON-RPC 2.0 spec)
let initialized_notification = JsonRpcNotification {
jsonrpc: "2.0".to_string(),
method: "notifications/initialized".to_string(),
params: None,
};
let notification_body = serde_json::to_string(&initialized_notification).unwrap();
info!("Sending initialized notification: {}", notification_body);
let notif_response = self
.client
.post(format!("{}/mcp", self.url))
.header("Content-Type", "application/json")
.header("Accept", "application/json, text/event-stream")
.header("mcp-session-id", &session_id)
.header(ARCH_UPSTREAM_HOST_HEADER, agent_id)
.body(notification_body)
.send()
.await
.expect("Failed to send initialized notification");
info!("Initialized notification response status: {}", notif_response.status());
session_id
}
/// Send request to terminal agent and return the raw response for streaming
pub async fn invoke_upstream_agent(
pub async fn invoke_terminal_agent(
&self,
messages: &[Message],
original_request: &ChatCompletionsRequest,
@ -169,7 +354,7 @@ impl PipelineProcessor {
let response = self
.client
.post(&self.url)
.post(format!("{}/v1/chat/completions", self.url))
.headers(agent_headers)
.body(request_body)
.send()

View file

@ -5,7 +5,7 @@ use brightstaff::handlers::function_calling::{function_calling_chat_handler};
use brightstaff::router::llm_router::RouterService;
use brightstaff::utils::tracing::init_tracer;
use bytes::Bytes;
use common::configuration::Configuration;
use common::configuration::{Agent, Configuration};
use common::consts::{CHAT_COMPLETIONS_PATH, MESSAGES_PATH, OPENAI_RESPONSES_API_PATH};
use http_body_util::{combinators::BoxBody, BodyExt, Empty};
use hyper::body::Incoming;
@ -63,9 +63,18 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let arch_config = Arc::new(config);
// combine agents and agent_filters into a single list of agents
let all_agents: Vec<Agent> = arch_config
.agents
.as_deref()
.unwrap_or_default()
.iter()
.chain(arch_config.agent_filters.as_deref().unwrap_or_default())
.cloned()
.collect();
let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone()));
let agents_list = Arc::new(RwLock::new(arch_config.agents.clone()));
let agent_filters = Arc::new(RwLock::new(arch_config.agent_filters.clone()));
let agents_list = Arc::new(RwLock::new(Some(all_agents)));
let listeners = Arc::new(RwLock::new(arch_config.listeners.clone()));
debug!(
@ -112,7 +121,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let agents_list = agents_list.clone();
let agent_filters = agent_filters.clone();
let listeners = listeners.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -121,7 +129,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let model_aliases = Arc::clone(&model_aliases);
let agents_list = agents_list.clone();
let agent_filters = agent_filters.clone();
let listeners = listeners.clone();
async move {

1
crates/build.sh Normal file
View file

@ -0,0 +1 @@
cargo build --release --target wasm32-wasip1 -p prompt_gateway -p llm_gateway && cargo build --release -p brightstaff

View file

@ -21,16 +21,10 @@ pub struct ModelAlias {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Agent {
pub id: String,
pub transport: Option<String>,
pub tool: Option<String>,
pub url: String,
pub kind: Option<String>,
pub url: String,
pub tool: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentFilter {
pub id: String,
pub url: String,
pub tool: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -65,7 +59,7 @@ pub struct Configuration {
pub mode: Option<GatewayMode>,
pub routing: Option<Routing>,
pub agents: Option<Vec<Agent>>,
pub agent_filters: Option<Vec<AgentFilter>>,
pub agent_filters: Option<Vec<Agent>>,
pub listeners: Vec<Listener>,
}