diff --git a/arch/arch_config_schema.yaml b/arch/arch_config_schema.yaml index f481b389..28e9d2e6 100644 --- a/arch/arch_config_schema.yaml +++ b/arch/arch_config_schema.yaml @@ -14,6 +14,10 @@ properties: type: array items: type: object + agent_filters: + type: array + items: + type: object listeners: oneOf: - type: array diff --git a/crates/brightstaff/src/handlers/agent_chat_completions.rs b/crates/brightstaff/src/handlers/agent_chat_completions.rs index a1a00f88..e15c0188 100644 --- a/crates/brightstaff/src/handlers/agent_chat_completions.rs +++ b/crates/brightstaff/src/handlers/agent_chat_completions.rs @@ -124,20 +124,25 @@ async fn handle_agent_chat( .find(|(key, _)| key.as_str() == "traceparent") .map(|(_, value)| value.to_str().unwrap_or_default().to_string()); - // Select appropriate agent using arch router llm model - let selected_agent = agent_selector - .select_agent(&chat_completions_request.messages, &listener, trace_parent) - .await?; - - debug!("Processing agent pipeline: {}", selected_agent.id); - - // Create agent map for pipeline processing + // Create agent map for pipeline processing and agent selection let agent_map = { let agents = agents_list.read().await; let agents = agents.as_ref().unwrap(); agent_selector.create_agent_map(agents) }; + // Select appropriate agent using arch router llm model + let selected_agent = agent_selector + .select_agent( + &chat_completions_request.messages, + &listener, + trace_parent, + &agent_map, + ) + .await?; + + debug!("Processing agent pipeline: {}", selected_agent.id); + // Process the filter chain let processed_messages = pipeline_processor .process_filter_chain( diff --git a/crates/brightstaff/src/handlers/agent_selector.rs b/crates/brightstaff/src/handlers/agent_selector.rs index 0fff1198..74d73af1 100644 --- a/crates/brightstaff/src/handlers/agent_selector.rs +++ b/crates/brightstaff/src/handlers/agent_selector.rs @@ -8,6 +8,7 @@ use hermesllm::apis::openai::Message; use tracing::{debug, warn}; use crate::router::llm_router::RouterService; +use crate::utils::mcp_client::McpClient; /// Errors that can occur during agent selection #[derive(Debug, thiserror::Error)] @@ -20,16 +21,22 @@ pub enum AgentSelectionError { RoutingError(String), #[error("Default agent not found for listener: {0}")] DefaultAgentNotFound(String), + #[error("MCP client error: {0}")] + McpError(String), } /// Service for selecting agents based on routing preferences and listener configuration pub struct AgentSelector { router_service: Arc, + mcp_client: McpClient, } impl AgentSelector { pub fn new(router_service: Arc) -> Self { - Self { router_service } + Self { + router_service, + mcp_client: McpClient::new(), + } } /// Find listener by name from the request headers @@ -65,6 +72,7 @@ impl AgentSelector { messages: &[Message], listener: &Listener, trace_parent: Option, + agent_map: &HashMap, ) -> Result { let agents = listener .agents @@ -77,7 +85,9 @@ impl AgentSelector { return Ok(agents[0].clone()); } - let usage_preferences = self.convert_agent_description_to_routing_preferences(agents); + let usage_preferences = self + .convert_agent_description_to_routing_preferences(agents, agent_map) + .await; debug!( "Agents usage preferences for agent routing str: {}", serde_json::to_string(&usage_preferences).unwrap_or_default() @@ -131,20 +141,75 @@ impl AgentSelector { } /// Convert agent descriptions to routing preferences - fn convert_agent_description_to_routing_preferences( + /// For agents with MCP URLs, fetches the tool description from the MCP server + async fn convert_agent_description_to_routing_preferences( &self, agents: &[AgentFilterChain], + agent_map: &HashMap, ) -> Vec { - agents - .iter() - .map(|agent| ModelUsagePreference { - model: agent.id.clone(), + let mut preferences = Vec::new(); + + for agent_chain in agents { + // Get the actual agent from the agent_map + let agent = agent_map.get(&agent_chain.id); + + // Determine the description to use + let description = if let Some(agent) = agent { + // Check if this is an MCP agent (URL starts with mcp://) + if agent.url.starts_with("mcp://") { + debug!( + "Agent {} is an MCP agent, fetching tool description from: {}", + agent.id, agent.url + ); + + // Fetch description from MCP endpoint + match self + .mcp_client + .fetch_tool_description(&agent.url, agent.tool.as_deref()) + .await + { + Ok(mcp_description) => { + if !mcp_description.is_empty() { + debug!( + "Fetched MCP description for agent {}: {}", + agent.id, mcp_description + ); + mcp_description + } else { + warn!( + "MCP tool description is empty for agent {}, using config description", + agent.id + ); + agent_chain.description.clone().unwrap_or_default() + } + } + Err(e) => { + warn!( + "Failed to fetch MCP description for agent {}: {}, using config description", + agent.id, e + ); + agent_chain.description.clone().unwrap_or_default() + } + } + } else { + // Not an MCP agent, use description from config + agent_chain.description.clone().unwrap_or_default() + } + } else { + // Agent not found in map, use description from config + agent_chain.description.clone().unwrap_or_default() + }; + + preferences.push(ModelUsagePreference { + model: agent_chain.id.clone(), routing_preferences: vec![RoutingPreference { - name: agent.id.clone(), - description: agent.description.as_ref().unwrap_or(&String::new()).clone(), + name: agent_chain.id.clone(), + description, }], - }) - .collect() + }); + } + + preferences } } @@ -185,6 +250,7 @@ mod tests { id: name.to_string(), kind: Some("test".to_string()), url: "http://localhost:8080".to_string(), + tool: None, } } @@ -240,8 +306,8 @@ mod tests { assert!(agent_map.contains_key("agent2")); } - #[test] - fn test_convert_agent_description_to_routing_preferences() { + #[tokio::test] + async fn test_convert_agent_description_to_routing_preferences() { let router_service = create_test_router_service(); let selector = AgentSelector::new(router_service); @@ -250,7 +316,15 @@ mod tests { create_test_agent("agent2", "Second agent description", false), ]; - let preferences = selector.convert_agent_description_to_routing_preferences(&agents); + let agent_structs = vec![ + create_test_agent_struct("agent1"), + create_test_agent_struct("agent2"), + ]; + let agent_map = selector.create_agent_map(&agent_structs); + + let preferences = selector + .convert_agent_description_to_routing_preferences(&agents, &agent_map) + .await; assert_eq!(preferences.len(), 2); assert_eq!(preferences[0].model, "agent1"); diff --git a/crates/brightstaff/src/main.rs b/crates/brightstaff/src/main.rs index 87bdea36..2d69424c 100644 --- a/crates/brightstaff/src/main.rs +++ b/crates/brightstaff/src/main.rs @@ -65,6 +65,7 @@ async fn main() -> Result<(), Box> { let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone())); let agents_list = Arc::new(RwLock::new(arch_config.agents.clone())); + let agent_filters = Arc::new(RwLock::new(arch_config.agent_filters.clone())); let listeners = Arc::new(RwLock::new(arch_config.listeners.clone())); debug!( @@ -111,6 +112,7 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let agents_list = agents_list.clone(); + let agent_filters = agent_filters.clone(); let listeners = listeners.clone(); let service = service_fn(move |req| { let router_service = Arc::clone(&router_service); @@ -119,6 +121,7 @@ async fn main() -> Result<(), Box> { let llm_providers = llm_providers.clone(); let model_aliases = Arc::clone(&model_aliases); let agents_list = agents_list.clone(); + let agent_filters = agent_filters.clone(); let listeners = listeners.clone(); async move { diff --git a/crates/brightstaff/src/utils/mcp_client.rs b/crates/brightstaff/src/utils/mcp_client.rs new file mode 100644 index 00000000..a818e43f --- /dev/null +++ b/crates/brightstaff/src/utils/mcp_client.rs @@ -0,0 +1,235 @@ +use reqwest::Client; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use tracing::{debug, warn}; + +/// MCP Tool definition from tools/list response +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct McpTool { + pub name: String, + pub description: Option, + #[serde(rename = "inputSchema")] + pub input_schema: Option, +} + +/// Response from MCP tools/list endpoint +#[derive(Debug, Serialize, Deserialize)] +struct McpToolsListResponse { + tools: Vec, +} + +/// Errors that can occur during MCP communication +#[derive(Debug, thiserror::Error)] +pub enum McpClientError { + #[error("HTTP request failed: {0}")] + HttpError(#[from] reqwest::Error), + #[error("Failed to parse response: {0}")] + ParseError(#[from] serde_json::Error), + #[error("Invalid MCP URL: {0}")] + InvalidUrl(String), + #[error("Tool not found: {0}")] + ToolNotFound(String), +} + +/// Client for communicating with MCP (Model Context Protocol) servers +pub struct McpClient { + client: Client, +} + +impl Default for McpClient { + fn default() -> Self { + Self::new() + } +} + +impl McpClient { + pub fn new() -> Self { + Self { + client: Client::new(), + } + } + + /// Parse MCP URL to extract host, port, and optional tool name + /// Supports formats: + /// - mcp://host:port + /// - mcp://host:port/tool_name + /// - mcp://host:port?tool=tool_name + fn parse_mcp_url(&self, mcp_url: &str) -> Result<(String, Option), McpClientError> { + // Remove mcp:// prefix + let url_without_scheme = mcp_url + .strip_prefix("mcp://") + .ok_or_else(|| McpClientError::InvalidUrl(format!("URL must start with mcp://: {}", mcp_url)))?; + + // Parse host:port and optional tool + let base_url: String; + let mut tool_name: Option = None; + + if let Some(query_start) = url_without_scheme.find('?') { + // Format: mcp://host:port?tool=tool_name + base_url = url_without_scheme[..query_start].to_string(); + let query = &url_without_scheme[query_start + 1..]; + + // Parse query parameters + for param in query.split('&') { + if let Some((key, value)) = param.split_once('=') { + if key == "tool" { + tool_name = Some(value.to_string()); + } + } + } + } else if let Some(path_start) = url_without_scheme.find('/') { + // Format: mcp://host:port/tool_name + base_url = url_without_scheme[..path_start].to_string(); + tool_name = Some(url_without_scheme[path_start + 1..].to_string()); + } else { + // Format: mcp://host:port + base_url = url_without_scheme.to_string(); + } + + Ok((format!("http://{}", base_url), tool_name)) + } + + /// Fetch list of tools from MCP server via SSE + pub async fn fetch_tools(&self, mcp_url: &str) -> Result, McpClientError> { + let (http_url, _) = self.parse_mcp_url(mcp_url)?; + let tools_list_url = format!("{}/sse/tools/list", http_url); + + debug!("Fetching tools from MCP endpoint: {}", tools_list_url); + + let response = self.client + .get(&tools_list_url) + .header("Accept", "text/event-stream") + .send() + .await?; + + if !response.status().is_success() { + warn!( + "Failed to fetch tools from {}: status {}", + tools_list_url, + response.status() + ); + return Ok(Vec::new()); + } + + let body = response.text().await?; + debug!("Received tools list response: {}", body); + + // Parse SSE response - looking for data: lines + let mut tools = Vec::new(); + for line in body.lines() { + if let Some(data) = line.strip_prefix("data: ") { + if data.trim() == "[DONE]" { + break; + } + + match serde_json::from_str::(data) { + Ok(response) => { + tools.extend(response.tools); + } + Err(e) => { + debug!("Failed to parse tools list data: {}, line: {}", e, data); + } + } + } + } + + debug!("Fetched {} tools from MCP server", tools.len()); + Ok(tools) + } + + /// Fetch specific tool description from MCP server + /// If tool_name is None, uses the tool name from the URL or returns the first tool + pub async fn fetch_tool_description( + &self, + mcp_url: &str, + tool_name_override: Option<&str>, + ) -> Result { + let (_, url_tool_name) = self.parse_mcp_url(mcp_url)?; + + // Determine which tool to look for + let target_tool_name = tool_name_override + .or(url_tool_name.as_deref()) + .ok_or_else(|| { + McpClientError::InvalidUrl( + "No tool name specified in URL or parameter".to_string() + ) + })?; + + debug!("Fetching description for tool: {}", target_tool_name); + + let tools = self.fetch_tools(mcp_url).await?; + + let tool = tools + .iter() + .find(|t| t.name == target_tool_name) + .ok_or_else(|| McpClientError::ToolNotFound(target_tool_name.to_string()))?; + + Ok(tool.description.clone().unwrap_or_default()) + } + + /// Fetch all tools as a map of tool name to description + pub async fn fetch_tools_map( + &self, + mcp_url: &str, + ) -> Result, McpClientError> { + let tools = self.fetch_tools(mcp_url).await?; + + Ok(tools + .into_iter() + .map(|tool| { + (tool.name, tool.description.unwrap_or_default()) + }) + .collect()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_parse_mcp_url_basic() { + let client = McpClient::new(); + + let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500").unwrap(); + assert_eq!(http_url, "http://localhost:10500"); + assert_eq!(tool, None); + } + + #[test] + fn test_parse_mcp_url_with_path() { + let client = McpClient::new(); + + let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500/rewrite_query").unwrap(); + assert_eq!(http_url, "http://localhost:10500"); + assert_eq!(tool, Some("rewrite_query".to_string())); + } + + #[test] + fn test_parse_mcp_url_with_query_param() { + let client = McpClient::new(); + + let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500?tool=rewrite_query").unwrap(); + assert_eq!(http_url, "http://localhost:10500"); + assert_eq!(tool, Some("rewrite_query".to_string())); + } + + #[test] + fn test_parse_mcp_url_with_host_docker_internal() { + let client = McpClient::new(); + + let (http_url, tool) = client + .parse_mcp_url("mcp://host.docker.internal:10500/context_builder") + .unwrap(); + assert_eq!(http_url, "http://host.docker.internal:10500"); + assert_eq!(tool, Some("context_builder".to_string())); + } + + #[test] + fn test_parse_mcp_url_invalid() { + let client = McpClient::new(); + + let result = client.parse_mcp_url("http://localhost:10500"); + assert!(result.is_err()); + } +} diff --git a/crates/brightstaff/src/utils/mod.rs b/crates/brightstaff/src/utils/mod.rs index 5ee45fbc..a6bfa629 100644 --- a/crates/brightstaff/src/utils/mod.rs +++ b/crates/brightstaff/src/utils/mod.rs @@ -1 +1,2 @@ +pub mod mcp_client; pub mod tracing; diff --git a/crates/common/src/configuration.rs b/crates/common/src/configuration.rs index 27f8ebd9..6b6557dc 100644 --- a/crates/common/src/configuration.rs +++ b/crates/common/src/configuration.rs @@ -23,6 +23,14 @@ pub struct Agent { pub id: String, pub kind: Option, pub url: String, + pub tool: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct AgentFilter { + pub id: String, + pub url: String, + pub tool: Option, } #[derive(Debug, Clone, Serialize, Deserialize)] @@ -57,6 +65,7 @@ pub struct Configuration { pub mode: Option, pub routing: Option, pub agents: Option>, + pub agent_filters: Option>, pub listeners: Vec, } diff --git a/demos/use_cases/rag_agent/README.md b/demos/use_cases/rag_agent/README.md index 66102f6f..f46a3b8c 100644 --- a/demos/use_cases/rag_agent/README.md +++ b/demos/use_cases/rag_agent/README.md @@ -1,28 +1,148 @@ -# RAG Agent Query Parser +# RAG Agent with MCP Protocol -A FastAPI service that rewrites user queries using archgw and gpt-4o-mini for better retrieval accuracy. +A multi-agent RAG system using the Model Context Protocol (MCP) for agent communication. -## How it Works +## Architecture -1. Receives a chat completion request with conversation history -2. Calls archgw's LLM gateway with gpt-4o-mini to rewrite the last user query -3. Returns the rewritten query as the assistant response +This demo consists of three MCP agents: +1. **Query Rewriter** - Rewrites user queries for better retrieval +2. **Context Builder** - Retrieves relevant context from knowledge base +3. **Response Generator** - Generates final responses with context + +Each agent runs as an independent MCP server and exposes tools that can be called via the MCP protocol. + +## MCP Tools + +### Query Rewriter Agent +- **Tool**: `rewrite_query_with_archgw` +- **Description**: Rewrites user queries using LLM for better retrieval +- **Port**: 10500 + +### Context Builder Agent +- **Tool**: `chat_completions` +- **Description**: Augments queries with relevant context from knowledge base +- **Port**: 10501 + +### Response Generator Agent +- **Port**: 10502 ## Setup and Running -1. **Start archgw**: - ```bash - archgw up --foreground - ``` +### 1. Start archgw +```bash +archgw up --foreground +``` -2. **Start the query parser service**: - ```bash - uv run python -m rag_agent.query_parser - ``` +### 2. Start Individual Agents + +**Query Rewriter:** +```bash +uv run python -m rag_agent \ + --agent query_rewriter \ + --host 0.0.0.0 \ + --port 10500 \ + --transport sse +``` + +**Context Builder:** +```bash +uv run python -m rag_agent \ + --agent context_builder \ + --host 0.0.0.0 \ + --port 10501 \ + --transport sse +``` + +**Response Generator:** +```bash +uv run python -m rag_agent \ + --agent response_generator \ + --host 0.0.0.0 \ + --port 10502 \ + --transport sse +``` + +### 3. Start All Agents at Once +```bash +./start_agents.sh +``` ## Configuration +The `arch_config.yaml` defines how agents are connected: + +```yaml +agent_filters: + - id: query_rewriter + url: mcp://host.docker.internal:10500 + tool: rewrite_query_with_archgw # MCP tool name + + - id: context_builder + url: mcp://host.docker.internal:10501 + tool: chat_completions +``` + +### MCP Tool Invocation Patterns + +The config supports different ways to specify MCP tools: + +**1. Separate tool field (recommended):** +```yaml +- id: query_rewriter + url: mcp://host.docker.internal:10500 + tool: rewrite_query_with_archgw +``` + +**2. Tool in URL path:** +```yaml +- id: query_rewriter + url: mcp://host.docker.internal:10500/rewrite_query_with_archgw +``` + +**3. Tool as query parameter:** +```yaml +- id: query_rewriter + url: mcp://host.docker.internal:10500?tool=rewrite_query_with_archgw +``` + +## CLI Options + +```bash +uv run python -m rag_agent --help + +Options: + --transport TEXT Transport type: stdio or sse (default: sse) + --host TEXT Host to bind MCP server to (default: localhost) + --port INTEGER Port for MCP server (default: 10500) + --agent TEXT Agent name: query_rewriter, context_builder, or response_generator (required) + --name TEXT Custom MCP server name (optional) +``` + +## Environment Variables + ```bash # archgw LLM Gateway base URL (default: http://localhost:12000/v1) export LLM_GATEWAY_ENDPOINT="http://localhost:12000/v1" + +# OpenAI API Key for model providers +export OPENAI_API_KEY="your-key-here" +``` + +## Testing + +See `sample_queries.md` for example queries to test the RAG system. + +Example request: +```bash +curl -X POST http://localhost:8001/v1/chat/completions \ + -H "Content-Type: application/json" \ + -d '{ + "model": "gpt-4o", + "messages": [ + { + "role": "user", + "content": "What is the guaranteed uptime for TechCorp?" + } + ] + }' ``` diff --git a/demos/use_cases/rag_agent/arch_config.yaml b/demos/use_cases/rag_agent/arch_config.yaml index b28b5de5..4c53c9fb 100644 --- a/demos/use_cases/rag_agent/arch_config.yaml +++ b/demos/use_cases/rag_agent/arch_config.yaml @@ -1,12 +1,24 @@ version: v0.3.0 agents: - - id: query_rewriter - url: http://host.docker.internal:10500/v1/chat/completions - - id: context_builder - url: http://host.docker.internal:10501/v1/chat/completions - id: rag_agent - url: http://host.docker.internal:10502/v1/chat/completions + url: mcp://host.docker.internal:10501 + # only sse is supported + # transport: sse or stdio + # optional tool name, defaults to "invoke" + # tool: invoke + - id: travel_agent + url: mcp://host.docker.internal:10502 + +agent_filters: + - id: query_rewriter + url: mcp://host.docker.internal:10500 + # tool is optional, defaults to id + # tool: query_rewriter + - id: context_builder + url: mcp://host.docker.internal:10500 + - id: input_guards + url: mcp://host.docker.internal:10500 model_providers: - model: openai/gpt-4o-mini @@ -23,15 +35,20 @@ model_aliases: listeners: - type: agent - name: agent_1 port: 8001 router: arch_agent_router agents: - id: rag_agent - description: virtual assistant for device contracts for simple queries + description: virtual assistant for retrieval augmented generation tasks filter_chain: + - input_guards - query_rewriter - context_builder + - id: travel_agent + description: virtual assistant for travel bookings and recommendations + filter_chain: + - input_guards + tracing: random_sampling: 100 diff --git a/demos/use_cases/rag_agent/src/rag_agent/__init__.py b/demos/use_cases/rag_agent/src/rag_agent/__init__.py index f21cc25b..9e8d04b5 100644 --- a/demos/use_cases/rag_agent/src/rag_agent/__init__.py +++ b/demos/use_cases/rag_agent/src/rag_agent/__init__.py @@ -5,57 +5,45 @@ mcp = None @click.command() -@click.option("--transport", "transport", default="stdio") -@click.option("--host", "host", default="localhost") -@click.option("--port", "port", default=10101) -@click.option("--agent", "agent", default=None) -@click.option( - "--rest-server", - "rest_server", - is_flag=True, - help="Start REST server instead of MCP server", -) -@click.option("--rest-port", "rest_port", default=8000, help="Port for REST server") -def main(host, port, agent, transport, rest_server, rest_port): - if rest_server: - print(f"Starting REST server on {host}:{rest_port} for agent: {agent}") - - if agent == "query_parser": - from rag_agent.query_rewriter_agent import start_server - - start_server(host=host, port=rest_port) - return - elif agent == "context_builder": - from rag_agent.context_builder_agent import ( - start_server, - ) - - start_server(host=host, port=rest_port) - return - elif agent == "response_generator": - from rag_agent.response_generator_agent import start_server - - start_server(host=host, port=rest_port) - return - else: - print("Please specify an agent to start with --agent option.") - return - - print(f"Starting agent(s): {agent if agent else 'all'}") +@click.option("--transport", "transport", default="sse", help="Transport type: stdio or sse") +@click.option("--host", "host", default="localhost", help="Host to bind MCP server to") +@click.option("--port", "port", type=int, default=10500, help="Port for MCP server") +@click.option("--agent", "agent", required=True, help="Agent name: query_rewriter, context_builder, or response_generator") +@click.option("--name", "agent_name", default=None, help="Custom MCP server name (defaults to agent type)") +def main(host, port, agent, transport, agent_name): + """Start a RAG agent as an MCP server.""" + + # Map friendly names to agent modules + agent_map = { + "query_rewriter": ("rag_agent.query_rewriter", "Query Rewriter Agent"), + "context_builder": ("rag_agent.context_builder_agent", "Context Builder Agent"), + "response_generator": ("rag_agent.response_generator", "Response Generator Agent"), + } + + if agent not in agent_map: + print(f"Error: Unknown agent '{agent}'") + print(f"Available agents: {', '.join(agent_map.keys())}") + return + + module_name, default_name = agent_map[agent] + mcp_name = agent_name or default_name + + print(f"Starting MCP server: {mcp_name}") + print(f" Agent: {agent}") + print(f" Transport: {transport}") + print(f" Host: {host}") + print(f" Port: {port}") + global mcp - mcp = FastMCP("RAG Agent Demo", host=host, port=port) - - if agent == "query_parser": - import rag_agent.query_parser - elif agent == "document_store": - import rag_agent.document_store - elif agent == "response_generator": - import rag_agent.response_generator - else: - import rag_agent.query_parser - import rag_agent.document_store - import rag_agent.response_generator - print("All agents loaded.") + mcp = FastMCP(mcp_name, host=host, port=port) + + # Import the agent module to register its tools + import importlib + importlib.import_module(module_name) + + print(f"Agent '{agent}' loaded successfully") + print(f"MCP server ready on {transport}://{host}:{port}") + mcp.run(transport=transport) diff --git a/demos/use_cases/rag_agent/src/rag_agent/context_builder_agent.py b/demos/use_cases/rag_agent/src/rag_agent/context_builder.py similarity index 95% rename from demos/use_cases/rag_agent/src/rag_agent/context_builder_agent.py rename to demos/use_cases/rag_agent/src/rag_agent/context_builder.py index b225aeed..5d3274fd 100644 --- a/demos/use_cases/rag_agent/src/rag_agent/context_builder_agent.py +++ b/demos/use_cases/rag_agent/src/rag_agent/context_builder.py @@ -10,7 +10,8 @@ from pathlib import Path import uvicorn from .api import ChatMessage, ChatCompletionRequest, ChatCompletionResponse - +from . import mcp +from fastmcp.server.dependencies import get_http_headers # Set up logging logging.basicConfig( @@ -190,12 +191,12 @@ class Response(BaseModel): # FastAPI app for REST server app = FastAPI(title="RAG Content Builder Agent", version="1.0.0") - +@mcp.tool() @app.post("/v1/chat/completions") -async def chat_completions( - request_body: ChatCompletionRequest, request: Request +async def context_builder( + request_body: ChatCompletionRequest ) -> ChatCompletionResponse: - """Chat completions endpoint that augments user queries with relevant context from the knowledge base.""" + """ chat completions endpoint that augments user queries with relevant context from the knowledge base.""" import time import uuid @@ -203,8 +204,10 @@ async def chat_completions( f"Received chat completion request with {len(request_body.messages)} messages" ) - # Read traceparent header if present - traceparent_header = request.headers.get("traceparent") + # Get traceparent header from HTTP request using FastMCP's dependency function + headers = get_http_headers() + traceparent_header = headers.get("traceparent") + if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") else: diff --git a/demos/use_cases/rag_agent/src/rag_agent/query_rewriter_agent.py b/demos/use_cases/rag_agent/src/rag_agent/query_rewriter.py similarity index 93% rename from demos/use_cases/rag_agent/src/rag_agent/query_rewriter_agent.py rename to demos/use_cases/rag_agent/src/rag_agent/query_rewriter.py index 6b2447dd..3cf32fec 100644 --- a/demos/use_cases/rag_agent/src/rag_agent/query_rewriter_agent.py +++ b/demos/use_cases/rag_agent/src/rag_agent/query_rewriter.py @@ -8,7 +8,8 @@ import logging import uvicorn from .api import ChatMessage, ChatCompletionRequest, ChatCompletionResponse - +from . import mcp +from fastmcp.server.dependencies import get_http_headers # Set up logging logging.basicConfig( @@ -28,11 +29,10 @@ archgw_client = AsyncOpenAI( api_key="EMPTY", # archgw doesn't require a real API key ) - async def rewrite_query_with_archgw( messages: List[ChatMessage], traceparent_header: str ) -> str: - # Prepare the system prompt for query rewriting + """ Rewrite the user query using LLM for better retrieval. """ system_prompt = """You are a query rewriter that improves user queries for better retrieval. Given a conversation history, rewrite the last user message to be more specific and context-aware. @@ -90,7 +90,8 @@ app = FastAPI(title="RAG Agent Query Parser", version="1.0.0") @app.post("/v1/chat/completions") -async def chat_completions(request_body: ChatCompletionRequest, request: Request): +@mcp.tool() +async def query_rewriter(request_body: ChatCompletionRequest): """Chat completions endpoint that rewrites the last user query using archgw.""" import time import uuid @@ -99,8 +100,10 @@ async def chat_completions(request_body: ChatCompletionRequest, request: Request f"Received chat completion request with {len(request_body.messages)} messages" ) - # Read traceparent header if present - traceparent_header = request.headers.get("traceparent") + # Get traceparent header from HTTP request using FastMCP's dependency function + headers = get_http_headers() + traceparent_header = headers.get("traceparent") + if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") else: diff --git a/demos/use_cases/rag_agent/src/rag_agent/response_generator_agent.py b/demos/use_cases/rag_agent/src/rag_agent/rag_agent.py similarity index 96% rename from demos/use_cases/rag_agent/src/rag_agent/response_generator_agent.py rename to demos/use_cases/rag_agent/src/rag_agent/rag_agent.py index 735f11da..1bf32a66 100644 --- a/demos/use_cases/rag_agent/src/rag_agent/response_generator_agent.py +++ b/demos/use_cases/rag_agent/src/rag_agent/rag_agent.py @@ -15,6 +15,9 @@ from .api import ( ChatCompletionStreamResponse, ) +from . import mcp +from fastmcp.server.dependencies import get_http_headers + # Set up logging logging.basicConfig( level=logging.INFO, @@ -60,14 +63,17 @@ def prepare_response_messages(request_body: ChatCompletionRequest): @app.post("/v1/chat/completions") -async def chat_completions(request_body: ChatCompletionRequest, request: Request): +@mcp.tool(name="invoke") +async def chat_completion(request_body: ChatCompletionRequest): """Chat completions endpoint that generates a coherent response based on all context.""" logger.info( f"Received chat completion request with {len(request_body.messages)} messages" ) - # Read traceparent header if present - traceparent_header = request.headers.get("traceparent") + # Get traceparent header from HTTP request using FastMCP's dependency function + headers = get_http_headers() + traceparent_header = headers.get("traceparent") + if traceparent_header: logger.info(f"Received traceparent header: {traceparent_header}") else: diff --git a/docs/MCP_AGENT_INTEGRATION.md b/docs/MCP_AGENT_INTEGRATION.md new file mode 100644 index 00000000..5cbe2e59 --- /dev/null +++ b/docs/MCP_AGENT_INTEGRATION.md @@ -0,0 +1,174 @@ +# MCP Agent Description Integration + +## Overview + +This implementation adds support for fetching agent tool descriptions from MCP (Model Context Protocol) endpoints during agent selection. This allows the system to use the actual tool descriptions from MCP servers for intelligent agent routing instead of relying solely on static descriptions in the configuration file. + +## Architecture + +### Components Modified + +1. **MCP Client Module** (`brightstaff/src/utils/mcp_client.rs`) + - New module that handles communication with MCP servers via SSE + - Fetches tool lists and descriptions from MCP endpoints + - Parses MCP URLs in multiple formats + +2. **Configuration Structs** (`common/src/configuration.rs`) + - Added optional `tool` field to `Agent` struct + - Added optional `tool` field to `AgentFilter` struct + - Supports specifying which MCP tool to invoke + +3. **Agent Selector** (`brightstaff/src/handlers/agent_selector.rs`) + - Enhanced to fetch tool descriptions from MCP endpoints + - Uses MCP descriptions for agent routing when available + - Falls back to configuration descriptions if MCP fetch fails + +4. **Agent Chat Completions** (`brightstaff/src/handlers/agent_chat_completions.rs`) + - Updated to pass agent_map to agent selector + - Ensures agent information is available during selection + +## How It Works + +### 1. Configuration + +Agents can now be configured with MCP URLs and optional tool names: + +```yaml +agents: + - id: rag_agent + url: mcp://host.docker.internal:10501 + tool: invoke # Optional: defaults to agent id + + - id: travel_agent + url: mcp://host.docker.internal:10502 + +agent_filters: + - id: query_rewriter + url: mcp://host.docker.internal:10500 + tool: rewrite_query_with_archgw # Optional +``` + +### 2. MCP URL Parsing + +The MCP client supports three URL formats: + +``` +mcp://host:port # Basic format +mcp://host:port/tool_name # Tool in path +mcp://host:port?tool=tool_name # Tool as query param +``` + +### 3. Description Fetching + +During agent selection: + +1. Agent selector checks if agent URL starts with `mcp://` +2. If yes, calls MCP client to fetch tool description from endpoint +3. MCP client makes GET request to `http://host:port/sse/tools/list` +4. Parses SSE response to extract tool descriptions +5. Returns description for specified tool (from config or URL) +6. Falls back to config description if MCP fetch fails + +### 4. Agent Routing + +The fetched MCP tool descriptions are used in routing preferences: + +```rust +ModelUsagePreference { + model: agent_id, + routing_preferences: vec![RoutingPreference { + name: agent_id, + description: mcp_description, // From MCP endpoint + }], +} +``` + +The LLM router uses these descriptions to select the appropriate agent based on the user's request. + +## Key Features + +1. **Automatic Description Fetching**: Tool descriptions are automatically fetched from MCP servers when agents have `mcp://` URLs + +2. **Graceful Fallback**: If MCP endpoint is unavailable or doesn't return a description, falls back to the description in arch_config.yaml + +3. **Multiple URL Formats**: Supports flexible MCP URL specification for tool names + +4. **Async Operation**: All MCP fetching is done asynchronously to avoid blocking + +5. **Comprehensive Logging**: Debug and warning logs track MCP interactions + +## Example Usage + +### Configuration (arch_config.yaml) + +```yaml +version: v0.3.0 + +agents: + - id: rag_agent + url: mcp://host.docker.internal:10501 + # Description will be fetched from MCP endpoint + +agent_filters: + - id: query_rewriter + url: mcp://host.docker.internal:10500 + tool: rewrite_query_with_archgw + +listeners: + - type: agent + port: 8001 + router: arch_agent_router + agents: + - id: rag_agent + description: fallback description if MCP unavailable + filter_chain: + - query_rewriter +``` + +### Flow + +1. User sends request to listener +2. Agent selector creates agent map from configuration +3. For each agent in listener: + - Checks if agent URL is MCP (`mcp://`) + - Fetches tool description from MCP endpoint + - Uses fetched description for routing +4. LLM router selects best agent based on descriptions +5. Filter chain processes request using selected agent + +## Testing + +Unit tests cover: +- MCP URL parsing (basic, path, query param formats) +- Agent selection with MCP descriptions +- Fallback to config descriptions +- Agent map creation + +Integration tests verify: +- End-to-end agent selection flow +- Pipeline processing with MCP agents + +## Error Handling + +The implementation handles: +- Invalid MCP URLs (returns error) +- Unreachable MCP endpoints (logs warning, uses config description) +- Missing tool in MCP response (returns error) +- Empty descriptions (logs warning, uses config description) +- Network errors (falls back to config description) + +## Backward Compatibility + +The changes are fully backward compatible: +- `tool` field is optional on Agent and AgentFilter +- Non-MCP agents work as before +- Config descriptions still work when MCP is not used + +## Future Enhancements + +Possible improvements: +1. Cache MCP tool descriptions to reduce network calls +2. Support for MCP stdio transport in addition to SSE +3. Periodic refresh of MCP descriptions +4. Support for fetching tool schemas along with descriptions +5. Metrics for MCP endpoint availability and response times diff --git a/docs/MCP_QUICK_START.md b/docs/MCP_QUICK_START.md new file mode 100644 index 00000000..c471cc96 --- /dev/null +++ b/docs/MCP_QUICK_START.md @@ -0,0 +1,132 @@ +# MCP Agent Description - Quick Start + +## What This Feature Does + +When processing agent filter chains in Brightstaff, the system can now automatically fetch tool descriptions from MCP (Model Context Protocol) endpoints. These descriptions are used by the LLM router to intelligently select the appropriate agent for handling user requests. + +## Configuration + +### Basic Setup + +Add agents with MCP URLs to your `arch_config.yaml`: + +```yaml +agents: + - id: rag_agent + url: mcp://host.docker.internal:10501 + + - id: query_rewriter + url: mcp://host.docker.internal:10500 + tool: rewrite_query_with_archgw # Optional: specify tool name + +listeners: + - type: agent + port: 8001 + router: arch_agent_router + agents: + - id: rag_agent + description: "RAG agent for document retrieval" # Fallback if MCP fails + filter_chain: + - query_rewriter +``` + +### MCP URL Formats + +Three formats are supported: + +```yaml +# 1. Basic - uses agent id as tool name +url: mcp://localhost:10500 + +# 2. Tool in path +url: mcp://localhost:10500/my_tool_name + +# 3. Tool as query parameter +url: mcp://localhost:10500?tool=my_tool_name +``` + +## How It Works + +1. **Request arrives** at agent listener +2. **Agent selector** needs to choose which agent to handle request +3. **For MCP agents**, description is fetched from endpoint: + ``` + GET http://host:port/sse/tools/list + Accept: text/event-stream + ``` +4. **Tool description extracted** from SSE response +5. **LLM router uses descriptions** to select best agent +6. **Selected agent processes** the request through its filter chain + +## Example MCP Response + +Your MCP server should respond to `/sse/tools/list` with: + +``` +data: {"tools": [{"name": "rewrite_query_with_archgw", "description": "Rewrites user queries using LLM for better retrieval", "inputSchema": {...}}]} +``` + +## Fallback Behavior + +If MCP endpoint fails or returns empty description: +- System logs a warning +- Falls back to `description` field from arch_config.yaml +- Processing continues normally + +## Logging + +Enable debug logging to see MCP interactions: + +```bash +RUST_LOG=debug cargo run +``` + +Look for logs like: +``` +Agent rag_agent is an MCP agent, fetching tool description from: mcp://host.docker.internal:10501 +Fetched MCP description for agent rag_agent: Rewrites user queries... +``` + +## Testing + +Test your MCP endpoint manually: + +```bash +# Check if endpoint is accessible +curl -H "Accept: text/event-stream" http://localhost:10500/sse/tools/list + +# Expected response format +data: {"tools": [{"name": "my_tool", "description": "My tool description"}]} +``` + +## Troubleshooting + +### "Failed to fetch MCP description" +- Check if MCP server is running +- Verify URL format is correct +- Ensure `/sse/tools/list` endpoint exists +- Check network connectivity + +### "MCP tool description is empty" +- Verify MCP server returns tool in response +- Check tool name matches configuration +- Ensure `description` field is populated in MCP response + +### "Tool not found" +- Verify tool name in config matches MCP server +- Check if tool is listed in `/sse/tools/list` response +- Try without explicit tool name (uses agent id) + +## Best Practices + +1. **Always provide fallback descriptions** in arch_config.yaml +2. **Use descriptive tool names** that match your config +3. **Keep MCP servers running** before starting Brightstaff +4. **Monitor logs** for MCP fetch failures +5. **Test MCP endpoints** independently before integration + +## See Also + +- [MCP Agent Integration Documentation](./MCP_AGENT_INTEGRATION.md) +- [RAG Agent Demo](../demos/use_cases/rag_agent/README.md) +- [Agent Configuration Reference](../demos/use_cases/rag_agent/arch_config.yaml)