more changes

This commit is contained in:
Adil Hafeez 2025-11-28 11:34:43 -08:00
parent dcfc85ca74
commit 4140c1cde4
No known key found for this signature in database
GPG key ID: 9B18EF7691369645
15 changed files with 883 additions and 109 deletions

View file

@ -14,6 +14,10 @@ properties:
type: array
items:
type: object
agent_filters:
type: array
items:
type: object
listeners:
oneOf:
- type: array

View file

@ -124,20 +124,25 @@ async fn handle_agent_chat(
.find(|(key, _)| key.as_str() == "traceparent")
.map(|(_, value)| value.to_str().unwrap_or_default().to_string());
// Select appropriate agent using arch router llm model
let selected_agent = agent_selector
.select_agent(&chat_completions_request.messages, &listener, trace_parent)
.await?;
debug!("Processing agent pipeline: {}", selected_agent.id);
// Create agent map for pipeline processing
// Create agent map for pipeline processing and agent selection
let agent_map = {
let agents = agents_list.read().await;
let agents = agents.as_ref().unwrap();
agent_selector.create_agent_map(agents)
};
// Select appropriate agent using arch router llm model
let selected_agent = agent_selector
.select_agent(
&chat_completions_request.messages,
&listener,
trace_parent,
&agent_map,
)
.await?;
debug!("Processing agent pipeline: {}", selected_agent.id);
// Process the filter chain
let processed_messages = pipeline_processor
.process_filter_chain(

View file

@ -8,6 +8,7 @@ use hermesllm::apis::openai::Message;
use tracing::{debug, warn};
use crate::router::llm_router::RouterService;
use crate::utils::mcp_client::McpClient;
/// Errors that can occur during agent selection
#[derive(Debug, thiserror::Error)]
@ -20,16 +21,22 @@ pub enum AgentSelectionError {
RoutingError(String),
#[error("Default agent not found for listener: {0}")]
DefaultAgentNotFound(String),
#[error("MCP client error: {0}")]
McpError(String),
}
/// Service for selecting agents based on routing preferences and listener configuration
pub struct AgentSelector {
router_service: Arc<RouterService>,
mcp_client: McpClient,
}
impl AgentSelector {
pub fn new(router_service: Arc<RouterService>) -> Self {
Self { router_service }
Self {
router_service,
mcp_client: McpClient::new(),
}
}
/// Find listener by name from the request headers
@ -65,6 +72,7 @@ impl AgentSelector {
messages: &[Message],
listener: &Listener,
trace_parent: Option<String>,
agent_map: &HashMap<String, Agent>,
) -> Result<AgentFilterChain, AgentSelectionError> {
let agents = listener
.agents
@ -77,7 +85,9 @@ impl AgentSelector {
return Ok(agents[0].clone());
}
let usage_preferences = self.convert_agent_description_to_routing_preferences(agents);
let usage_preferences = self
.convert_agent_description_to_routing_preferences(agents, agent_map)
.await;
debug!(
"Agents usage preferences for agent routing str: {}",
serde_json::to_string(&usage_preferences).unwrap_or_default()
@ -131,20 +141,75 @@ impl AgentSelector {
}
/// Convert agent descriptions to routing preferences
fn convert_agent_description_to_routing_preferences(
/// For agents with MCP URLs, fetches the tool description from the MCP server
async fn convert_agent_description_to_routing_preferences(
&self,
agents: &[AgentFilterChain],
agent_map: &HashMap<String, Agent>,
) -> Vec<ModelUsagePreference> {
agents
.iter()
.map(|agent| ModelUsagePreference {
model: agent.id.clone(),
let mut preferences = Vec::new();
for agent_chain in agents {
// Get the actual agent from the agent_map
let agent = agent_map.get(&agent_chain.id);
// Determine the description to use
let description = if let Some(agent) = agent {
// Check if this is an MCP agent (URL starts with mcp://)
if agent.url.starts_with("mcp://") {
debug!(
"Agent {} is an MCP agent, fetching tool description from: {}",
agent.id, agent.url
);
// Fetch description from MCP endpoint
match self
.mcp_client
.fetch_tool_description(&agent.url, agent.tool.as_deref())
.await
{
Ok(mcp_description) => {
if !mcp_description.is_empty() {
debug!(
"Fetched MCP description for agent {}: {}",
agent.id, mcp_description
);
mcp_description
} else {
warn!(
"MCP tool description is empty for agent {}, using config description",
agent.id
);
agent_chain.description.clone().unwrap_or_default()
}
}
Err(e) => {
warn!(
"Failed to fetch MCP description for agent {}: {}, using config description",
agent.id, e
);
agent_chain.description.clone().unwrap_or_default()
}
}
} else {
// Not an MCP agent, use description from config
agent_chain.description.clone().unwrap_or_default()
}
} else {
// Agent not found in map, use description from config
agent_chain.description.clone().unwrap_or_default()
};
preferences.push(ModelUsagePreference {
model: agent_chain.id.clone(),
routing_preferences: vec![RoutingPreference {
name: agent.id.clone(),
description: agent.description.as_ref().unwrap_or(&String::new()).clone(),
name: agent_chain.id.clone(),
description,
}],
})
.collect()
});
}
preferences
}
}
@ -185,6 +250,7 @@ mod tests {
id: name.to_string(),
kind: Some("test".to_string()),
url: "http://localhost:8080".to_string(),
tool: None,
}
}
@ -240,8 +306,8 @@ mod tests {
assert!(agent_map.contains_key("agent2"));
}
#[test]
fn test_convert_agent_description_to_routing_preferences() {
#[tokio::test]
async fn test_convert_agent_description_to_routing_preferences() {
let router_service = create_test_router_service();
let selector = AgentSelector::new(router_service);
@ -250,7 +316,15 @@ mod tests {
create_test_agent("agent2", "Second agent description", false),
];
let preferences = selector.convert_agent_description_to_routing_preferences(&agents);
let agent_structs = vec![
create_test_agent_struct("agent1"),
create_test_agent_struct("agent2"),
];
let agent_map = selector.create_agent_map(&agent_structs);
let preferences = selector
.convert_agent_description_to_routing_preferences(&agents, &agent_map)
.await;
assert_eq!(preferences.len(), 2);
assert_eq!(preferences[0].model, "agent1");

View file

@ -65,6 +65,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = Arc::new(RwLock::new(arch_config.model_providers.clone()));
let agents_list = Arc::new(RwLock::new(arch_config.agents.clone()));
let agent_filters = Arc::new(RwLock::new(arch_config.agent_filters.clone()));
let listeners = Arc::new(RwLock::new(arch_config.listeners.clone()));
debug!(
@ -111,6 +112,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let agents_list = agents_list.clone();
let agent_filters = agent_filters.clone();
let listeners = listeners.clone();
let service = service_fn(move |req| {
let router_service = Arc::clone(&router_service);
@ -119,6 +121,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
let llm_providers = llm_providers.clone();
let model_aliases = Arc::clone(&model_aliases);
let agents_list = agents_list.clone();
let agent_filters = agent_filters.clone();
let listeners = listeners.clone();
async move {

View file

@ -0,0 +1,235 @@
use reqwest::Client;
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use tracing::{debug, warn};
/// MCP Tool definition from tools/list response
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct McpTool {
pub name: String,
pub description: Option<String>,
#[serde(rename = "inputSchema")]
pub input_schema: Option<serde_json::Value>,
}
/// Response from MCP tools/list endpoint
#[derive(Debug, Serialize, Deserialize)]
struct McpToolsListResponse {
tools: Vec<McpTool>,
}
/// Errors that can occur during MCP communication
#[derive(Debug, thiserror::Error)]
pub enum McpClientError {
#[error("HTTP request failed: {0}")]
HttpError(#[from] reqwest::Error),
#[error("Failed to parse response: {0}")]
ParseError(#[from] serde_json::Error),
#[error("Invalid MCP URL: {0}")]
InvalidUrl(String),
#[error("Tool not found: {0}")]
ToolNotFound(String),
}
/// Client for communicating with MCP (Model Context Protocol) servers
pub struct McpClient {
client: Client,
}
impl Default for McpClient {
fn default() -> Self {
Self::new()
}
}
impl McpClient {
pub fn new() -> Self {
Self {
client: Client::new(),
}
}
/// Parse MCP URL to extract host, port, and optional tool name
/// Supports formats:
/// - mcp://host:port
/// - mcp://host:port/tool_name
/// - mcp://host:port?tool=tool_name
fn parse_mcp_url(&self, mcp_url: &str) -> Result<(String, Option<String>), McpClientError> {
// Remove mcp:// prefix
let url_without_scheme = mcp_url
.strip_prefix("mcp://")
.ok_or_else(|| McpClientError::InvalidUrl(format!("URL must start with mcp://: {}", mcp_url)))?;
// Parse host:port and optional tool
let base_url: String;
let mut tool_name: Option<String> = None;
if let Some(query_start) = url_without_scheme.find('?') {
// Format: mcp://host:port?tool=tool_name
base_url = url_without_scheme[..query_start].to_string();
let query = &url_without_scheme[query_start + 1..];
// Parse query parameters
for param in query.split('&') {
if let Some((key, value)) = param.split_once('=') {
if key == "tool" {
tool_name = Some(value.to_string());
}
}
}
} else if let Some(path_start) = url_without_scheme.find('/') {
// Format: mcp://host:port/tool_name
base_url = url_without_scheme[..path_start].to_string();
tool_name = Some(url_without_scheme[path_start + 1..].to_string());
} else {
// Format: mcp://host:port
base_url = url_without_scheme.to_string();
}
Ok((format!("http://{}", base_url), tool_name))
}
/// Fetch list of tools from MCP server via SSE
pub async fn fetch_tools(&self, mcp_url: &str) -> Result<Vec<McpTool>, McpClientError> {
let (http_url, _) = self.parse_mcp_url(mcp_url)?;
let tools_list_url = format!("{}/sse/tools/list", http_url);
debug!("Fetching tools from MCP endpoint: {}", tools_list_url);
let response = self.client
.get(&tools_list_url)
.header("Accept", "text/event-stream")
.send()
.await?;
if !response.status().is_success() {
warn!(
"Failed to fetch tools from {}: status {}",
tools_list_url,
response.status()
);
return Ok(Vec::new());
}
let body = response.text().await?;
debug!("Received tools list response: {}", body);
// Parse SSE response - looking for data: lines
let mut tools = Vec::new();
for line in body.lines() {
if let Some(data) = line.strip_prefix("data: ") {
if data.trim() == "[DONE]" {
break;
}
match serde_json::from_str::<McpToolsListResponse>(data) {
Ok(response) => {
tools.extend(response.tools);
}
Err(e) => {
debug!("Failed to parse tools list data: {}, line: {}", e, data);
}
}
}
}
debug!("Fetched {} tools from MCP server", tools.len());
Ok(tools)
}
/// Fetch specific tool description from MCP server
/// If tool_name is None, uses the tool name from the URL or returns the first tool
pub async fn fetch_tool_description(
&self,
mcp_url: &str,
tool_name_override: Option<&str>,
) -> Result<String, McpClientError> {
let (_, url_tool_name) = self.parse_mcp_url(mcp_url)?;
// Determine which tool to look for
let target_tool_name = tool_name_override
.or(url_tool_name.as_deref())
.ok_or_else(|| {
McpClientError::InvalidUrl(
"No tool name specified in URL or parameter".to_string()
)
})?;
debug!("Fetching description for tool: {}", target_tool_name);
let tools = self.fetch_tools(mcp_url).await?;
let tool = tools
.iter()
.find(|t| t.name == target_tool_name)
.ok_or_else(|| McpClientError::ToolNotFound(target_tool_name.to_string()))?;
Ok(tool.description.clone().unwrap_or_default())
}
/// Fetch all tools as a map of tool name to description
pub async fn fetch_tools_map(
&self,
mcp_url: &str,
) -> Result<HashMap<String, String>, McpClientError> {
let tools = self.fetch_tools(mcp_url).await?;
Ok(tools
.into_iter()
.map(|tool| {
(tool.name, tool.description.unwrap_or_default())
})
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parse_mcp_url_basic() {
let client = McpClient::new();
let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500").unwrap();
assert_eq!(http_url, "http://localhost:10500");
assert_eq!(tool, None);
}
#[test]
fn test_parse_mcp_url_with_path() {
let client = McpClient::new();
let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500/rewrite_query").unwrap();
assert_eq!(http_url, "http://localhost:10500");
assert_eq!(tool, Some("rewrite_query".to_string()));
}
#[test]
fn test_parse_mcp_url_with_query_param() {
let client = McpClient::new();
let (http_url, tool) = client.parse_mcp_url("mcp://localhost:10500?tool=rewrite_query").unwrap();
assert_eq!(http_url, "http://localhost:10500");
assert_eq!(tool, Some("rewrite_query".to_string()));
}
#[test]
fn test_parse_mcp_url_with_host_docker_internal() {
let client = McpClient::new();
let (http_url, tool) = client
.parse_mcp_url("mcp://host.docker.internal:10500/context_builder")
.unwrap();
assert_eq!(http_url, "http://host.docker.internal:10500");
assert_eq!(tool, Some("context_builder".to_string()));
}
#[test]
fn test_parse_mcp_url_invalid() {
let client = McpClient::new();
let result = client.parse_mcp_url("http://localhost:10500");
assert!(result.is_err());
}
}

View file

@ -1 +1,2 @@
pub mod mcp_client;
pub mod tracing;

View file

@ -23,6 +23,14 @@ pub struct Agent {
pub id: String,
pub kind: Option<String>,
pub url: String,
pub tool: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AgentFilter {
pub id: String,
pub url: String,
pub tool: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -57,6 +65,7 @@ pub struct Configuration {
pub mode: Option<GatewayMode>,
pub routing: Option<Routing>,
pub agents: Option<Vec<Agent>>,
pub agent_filters: Option<Vec<AgentFilter>>,
pub listeners: Vec<Listener>,
}

View file

@ -1,28 +1,148 @@
# RAG Agent Query Parser
# RAG Agent with MCP Protocol
A FastAPI service that rewrites user queries using archgw and gpt-4o-mini for better retrieval accuracy.
A multi-agent RAG system using the Model Context Protocol (MCP) for agent communication.
## How it Works
## Architecture
1. Receives a chat completion request with conversation history
2. Calls archgw's LLM gateway with gpt-4o-mini to rewrite the last user query
3. Returns the rewritten query as the assistant response
This demo consists of three MCP agents:
1. **Query Rewriter** - Rewrites user queries for better retrieval
2. **Context Builder** - Retrieves relevant context from knowledge base
3. **Response Generator** - Generates final responses with context
Each agent runs as an independent MCP server and exposes tools that can be called via the MCP protocol.
## MCP Tools
### Query Rewriter Agent
- **Tool**: `rewrite_query_with_archgw`
- **Description**: Rewrites user queries using LLM for better retrieval
- **Port**: 10500
### Context Builder Agent
- **Tool**: `chat_completions`
- **Description**: Augments queries with relevant context from knowledge base
- **Port**: 10501
### Response Generator Agent
- **Port**: 10502
## Setup and Running
1. **Start archgw**:
```bash
archgw up --foreground
```
### 1. Start archgw
```bash
archgw up --foreground
```
2. **Start the query parser service**:
```bash
uv run python -m rag_agent.query_parser
```
### 2. Start Individual Agents
**Query Rewriter:**
```bash
uv run python -m rag_agent \
--agent query_rewriter \
--host 0.0.0.0 \
--port 10500 \
--transport sse
```
**Context Builder:**
```bash
uv run python -m rag_agent \
--agent context_builder \
--host 0.0.0.0 \
--port 10501 \
--transport sse
```
**Response Generator:**
```bash
uv run python -m rag_agent \
--agent response_generator \
--host 0.0.0.0 \
--port 10502 \
--transport sse
```
### 3. Start All Agents at Once
```bash
./start_agents.sh
```
## Configuration
The `arch_config.yaml` defines how agents are connected:
```yaml
agent_filters:
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw # MCP tool name
- id: context_builder
url: mcp://host.docker.internal:10501
tool: chat_completions
```
### MCP Tool Invocation Patterns
The config supports different ways to specify MCP tools:
**1. Separate tool field (recommended):**
```yaml
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw
```
**2. Tool in URL path:**
```yaml
- id: query_rewriter
url: mcp://host.docker.internal:10500/rewrite_query_with_archgw
```
**3. Tool as query parameter:**
```yaml
- id: query_rewriter
url: mcp://host.docker.internal:10500?tool=rewrite_query_with_archgw
```
## CLI Options
```bash
uv run python -m rag_agent --help
Options:
--transport TEXT Transport type: stdio or sse (default: sse)
--host TEXT Host to bind MCP server to (default: localhost)
--port INTEGER Port for MCP server (default: 10500)
--agent TEXT Agent name: query_rewriter, context_builder, or response_generator (required)
--name TEXT Custom MCP server name (optional)
```
## Environment Variables
```bash
# archgw LLM Gateway base URL (default: http://localhost:12000/v1)
export LLM_GATEWAY_ENDPOINT="http://localhost:12000/v1"
# OpenAI API Key for model providers
export OPENAI_API_KEY="your-key-here"
```
## Testing
See `sample_queries.md` for example queries to test the RAG system.
Example request:
```bash
curl -X POST http://localhost:8001/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"model": "gpt-4o",
"messages": [
{
"role": "user",
"content": "What is the guaranteed uptime for TechCorp?"
}
]
}'
```

View file

@ -1,12 +1,24 @@
version: v0.3.0
agents:
- id: query_rewriter
url: http://host.docker.internal:10500/v1/chat/completions
- id: context_builder
url: http://host.docker.internal:10501/v1/chat/completions
- id: rag_agent
url: http://host.docker.internal:10502/v1/chat/completions
url: mcp://host.docker.internal:10501
# only sse is supported
# transport: sse or stdio
# optional tool name, defaults to "invoke"
# tool: invoke
- id: travel_agent
url: mcp://host.docker.internal:10502
agent_filters:
- id: query_rewriter
url: mcp://host.docker.internal:10500
# tool is optional, defaults to id
# tool: query_rewriter
- id: context_builder
url: mcp://host.docker.internal:10500
- id: input_guards
url: mcp://host.docker.internal:10500
model_providers:
- model: openai/gpt-4o-mini
@ -23,15 +35,20 @@ model_aliases:
listeners:
- type: agent
name: agent_1
port: 8001
router: arch_agent_router
agents:
- id: rag_agent
description: virtual assistant for device contracts for simple queries
description: virtual assistant for retrieval augmented generation tasks
filter_chain:
- input_guards
- query_rewriter
- context_builder
- id: travel_agent
description: virtual assistant for travel bookings and recommendations
filter_chain:
- input_guards
tracing:
random_sampling: 100

View file

@ -5,57 +5,45 @@ mcp = None
@click.command()
@click.option("--transport", "transport", default="stdio")
@click.option("--host", "host", default="localhost")
@click.option("--port", "port", default=10101)
@click.option("--agent", "agent", default=None)
@click.option(
"--rest-server",
"rest_server",
is_flag=True,
help="Start REST server instead of MCP server",
)
@click.option("--rest-port", "rest_port", default=8000, help="Port for REST server")
def main(host, port, agent, transport, rest_server, rest_port):
if rest_server:
print(f"Starting REST server on {host}:{rest_port} for agent: {agent}")
if agent == "query_parser":
from rag_agent.query_rewriter_agent import start_server
start_server(host=host, port=rest_port)
return
elif agent == "context_builder":
from rag_agent.context_builder_agent import (
start_server,
)
start_server(host=host, port=rest_port)
return
elif agent == "response_generator":
from rag_agent.response_generator_agent import start_server
start_server(host=host, port=rest_port)
return
else:
print("Please specify an agent to start with --agent option.")
return
print(f"Starting agent(s): {agent if agent else 'all'}")
@click.option("--transport", "transport", default="sse", help="Transport type: stdio or sse")
@click.option("--host", "host", default="localhost", help="Host to bind MCP server to")
@click.option("--port", "port", type=int, default=10500, help="Port for MCP server")
@click.option("--agent", "agent", required=True, help="Agent name: query_rewriter, context_builder, or response_generator")
@click.option("--name", "agent_name", default=None, help="Custom MCP server name (defaults to agent type)")
def main(host, port, agent, transport, agent_name):
"""Start a RAG agent as an MCP server."""
# Map friendly names to agent modules
agent_map = {
"query_rewriter": ("rag_agent.query_rewriter", "Query Rewriter Agent"),
"context_builder": ("rag_agent.context_builder_agent", "Context Builder Agent"),
"response_generator": ("rag_agent.response_generator", "Response Generator Agent"),
}
if agent not in agent_map:
print(f"Error: Unknown agent '{agent}'")
print(f"Available agents: {', '.join(agent_map.keys())}")
return
module_name, default_name = agent_map[agent]
mcp_name = agent_name or default_name
print(f"Starting MCP server: {mcp_name}")
print(f" Agent: {agent}")
print(f" Transport: {transport}")
print(f" Host: {host}")
print(f" Port: {port}")
global mcp
mcp = FastMCP("RAG Agent Demo", host=host, port=port)
if agent == "query_parser":
import rag_agent.query_parser
elif agent == "document_store":
import rag_agent.document_store
elif agent == "response_generator":
import rag_agent.response_generator
else:
import rag_agent.query_parser
import rag_agent.document_store
import rag_agent.response_generator
print("All agents loaded.")
mcp = FastMCP(mcp_name, host=host, port=port)
# Import the agent module to register its tools
import importlib
importlib.import_module(module_name)
print(f"Agent '{agent}' loaded successfully")
print(f"MCP server ready on {transport}://{host}:{port}")
mcp.run(transport=transport)

View file

@ -10,7 +10,8 @@ from pathlib import Path
import uvicorn
from .api import ChatMessage, ChatCompletionRequest, ChatCompletionResponse
from . import mcp
from fastmcp.server.dependencies import get_http_headers
# Set up logging
logging.basicConfig(
@ -190,12 +191,12 @@ class Response(BaseModel):
# FastAPI app for REST server
app = FastAPI(title="RAG Content Builder Agent", version="1.0.0")
@mcp.tool()
@app.post("/v1/chat/completions")
async def chat_completions(
request_body: ChatCompletionRequest, request: Request
async def context_builder(
request_body: ChatCompletionRequest
) -> ChatCompletionResponse:
"""Chat completions endpoint that augments user queries with relevant context from the knowledge base."""
""" chat completions endpoint that augments user queries with relevant context from the knowledge base."""
import time
import uuid
@ -203,8 +204,10 @@ async def chat_completions(
f"Received chat completion request with {len(request_body.messages)} messages"
)
# Read traceparent header if present
traceparent_header = request.headers.get("traceparent")
# Get traceparent header from HTTP request using FastMCP's dependency function
headers = get_http_headers()
traceparent_header = headers.get("traceparent")
if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}")
else:

View file

@ -8,7 +8,8 @@ import logging
import uvicorn
from .api import ChatMessage, ChatCompletionRequest, ChatCompletionResponse
from . import mcp
from fastmcp.server.dependencies import get_http_headers
# Set up logging
logging.basicConfig(
@ -28,11 +29,10 @@ archgw_client = AsyncOpenAI(
api_key="EMPTY", # archgw doesn't require a real API key
)
async def rewrite_query_with_archgw(
messages: List[ChatMessage], traceparent_header: str
) -> str:
# Prepare the system prompt for query rewriting
""" Rewrite the user query using LLM for better retrieval. """
system_prompt = """You are a query rewriter that improves user queries for better retrieval.
Given a conversation history, rewrite the last user message to be more specific and context-aware.
@ -90,7 +90,8 @@ app = FastAPI(title="RAG Agent Query Parser", version="1.0.0")
@app.post("/v1/chat/completions")
async def chat_completions(request_body: ChatCompletionRequest, request: Request):
@mcp.tool()
async def query_rewriter(request_body: ChatCompletionRequest):
"""Chat completions endpoint that rewrites the last user query using archgw."""
import time
import uuid
@ -99,8 +100,10 @@ async def chat_completions(request_body: ChatCompletionRequest, request: Request
f"Received chat completion request with {len(request_body.messages)} messages"
)
# Read traceparent header if present
traceparent_header = request.headers.get("traceparent")
# Get traceparent header from HTTP request using FastMCP's dependency function
headers = get_http_headers()
traceparent_header = headers.get("traceparent")
if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}")
else:

View file

@ -15,6 +15,9 @@ from .api import (
ChatCompletionStreamResponse,
)
from . import mcp
from fastmcp.server.dependencies import get_http_headers
# Set up logging
logging.basicConfig(
level=logging.INFO,
@ -60,14 +63,17 @@ def prepare_response_messages(request_body: ChatCompletionRequest):
@app.post("/v1/chat/completions")
async def chat_completions(request_body: ChatCompletionRequest, request: Request):
@mcp.tool(name="invoke")
async def chat_completion(request_body: ChatCompletionRequest):
"""Chat completions endpoint that generates a coherent response based on all context."""
logger.info(
f"Received chat completion request with {len(request_body.messages)} messages"
)
# Read traceparent header if present
traceparent_header = request.headers.get("traceparent")
# Get traceparent header from HTTP request using FastMCP's dependency function
headers = get_http_headers()
traceparent_header = headers.get("traceparent")
if traceparent_header:
logger.info(f"Received traceparent header: {traceparent_header}")
else:

View file

@ -0,0 +1,174 @@
# MCP Agent Description Integration
## Overview
This implementation adds support for fetching agent tool descriptions from MCP (Model Context Protocol) endpoints during agent selection. This allows the system to use the actual tool descriptions from MCP servers for intelligent agent routing instead of relying solely on static descriptions in the configuration file.
## Architecture
### Components Modified
1. **MCP Client Module** (`brightstaff/src/utils/mcp_client.rs`)
- New module that handles communication with MCP servers via SSE
- Fetches tool lists and descriptions from MCP endpoints
- Parses MCP URLs in multiple formats
2. **Configuration Structs** (`common/src/configuration.rs`)
- Added optional `tool` field to `Agent` struct
- Added optional `tool` field to `AgentFilter` struct
- Supports specifying which MCP tool to invoke
3. **Agent Selector** (`brightstaff/src/handlers/agent_selector.rs`)
- Enhanced to fetch tool descriptions from MCP endpoints
- Uses MCP descriptions for agent routing when available
- Falls back to configuration descriptions if MCP fetch fails
4. **Agent Chat Completions** (`brightstaff/src/handlers/agent_chat_completions.rs`)
- Updated to pass agent_map to agent selector
- Ensures agent information is available during selection
## How It Works
### 1. Configuration
Agents can now be configured with MCP URLs and optional tool names:
```yaml
agents:
- id: rag_agent
url: mcp://host.docker.internal:10501
tool: invoke # Optional: defaults to agent id
- id: travel_agent
url: mcp://host.docker.internal:10502
agent_filters:
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw # Optional
```
### 2. MCP URL Parsing
The MCP client supports three URL formats:
```
mcp://host:port # Basic format
mcp://host:port/tool_name # Tool in path
mcp://host:port?tool=tool_name # Tool as query param
```
### 3. Description Fetching
During agent selection:
1. Agent selector checks if agent URL starts with `mcp://`
2. If yes, calls MCP client to fetch tool description from endpoint
3. MCP client makes GET request to `http://host:port/sse/tools/list`
4. Parses SSE response to extract tool descriptions
5. Returns description for specified tool (from config or URL)
6. Falls back to config description if MCP fetch fails
### 4. Agent Routing
The fetched MCP tool descriptions are used in routing preferences:
```rust
ModelUsagePreference {
model: agent_id,
routing_preferences: vec![RoutingPreference {
name: agent_id,
description: mcp_description, // From MCP endpoint
}],
}
```
The LLM router uses these descriptions to select the appropriate agent based on the user's request.
## Key Features
1. **Automatic Description Fetching**: Tool descriptions are automatically fetched from MCP servers when agents have `mcp://` URLs
2. **Graceful Fallback**: If MCP endpoint is unavailable or doesn't return a description, falls back to the description in arch_config.yaml
3. **Multiple URL Formats**: Supports flexible MCP URL specification for tool names
4. **Async Operation**: All MCP fetching is done asynchronously to avoid blocking
5. **Comprehensive Logging**: Debug and warning logs track MCP interactions
## Example Usage
### Configuration (arch_config.yaml)
```yaml
version: v0.3.0
agents:
- id: rag_agent
url: mcp://host.docker.internal:10501
# Description will be fetched from MCP endpoint
agent_filters:
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw
listeners:
- type: agent
port: 8001
router: arch_agent_router
agents:
- id: rag_agent
description: fallback description if MCP unavailable
filter_chain:
- query_rewriter
```
### Flow
1. User sends request to listener
2. Agent selector creates agent map from configuration
3. For each agent in listener:
- Checks if agent URL is MCP (`mcp://`)
- Fetches tool description from MCP endpoint
- Uses fetched description for routing
4. LLM router selects best agent based on descriptions
5. Filter chain processes request using selected agent
## Testing
Unit tests cover:
- MCP URL parsing (basic, path, query param formats)
- Agent selection with MCP descriptions
- Fallback to config descriptions
- Agent map creation
Integration tests verify:
- End-to-end agent selection flow
- Pipeline processing with MCP agents
## Error Handling
The implementation handles:
- Invalid MCP URLs (returns error)
- Unreachable MCP endpoints (logs warning, uses config description)
- Missing tool in MCP response (returns error)
- Empty descriptions (logs warning, uses config description)
- Network errors (falls back to config description)
## Backward Compatibility
The changes are fully backward compatible:
- `tool` field is optional on Agent and AgentFilter
- Non-MCP agents work as before
- Config descriptions still work when MCP is not used
## Future Enhancements
Possible improvements:
1. Cache MCP tool descriptions to reduce network calls
2. Support for MCP stdio transport in addition to SSE
3. Periodic refresh of MCP descriptions
4. Support for fetching tool schemas along with descriptions
5. Metrics for MCP endpoint availability and response times

132
docs/MCP_QUICK_START.md Normal file
View file

@ -0,0 +1,132 @@
# MCP Agent Description - Quick Start
## What This Feature Does
When processing agent filter chains in Brightstaff, the system can now automatically fetch tool descriptions from MCP (Model Context Protocol) endpoints. These descriptions are used by the LLM router to intelligently select the appropriate agent for handling user requests.
## Configuration
### Basic Setup
Add agents with MCP URLs to your `arch_config.yaml`:
```yaml
agents:
- id: rag_agent
url: mcp://host.docker.internal:10501
- id: query_rewriter
url: mcp://host.docker.internal:10500
tool: rewrite_query_with_archgw # Optional: specify tool name
listeners:
- type: agent
port: 8001
router: arch_agent_router
agents:
- id: rag_agent
description: "RAG agent for document retrieval" # Fallback if MCP fails
filter_chain:
- query_rewriter
```
### MCP URL Formats
Three formats are supported:
```yaml
# 1. Basic - uses agent id as tool name
url: mcp://localhost:10500
# 2. Tool in path
url: mcp://localhost:10500/my_tool_name
# 3. Tool as query parameter
url: mcp://localhost:10500?tool=my_tool_name
```
## How It Works
1. **Request arrives** at agent listener
2. **Agent selector** needs to choose which agent to handle request
3. **For MCP agents**, description is fetched from endpoint:
```
GET http://host:port/sse/tools/list
Accept: text/event-stream
```
4. **Tool description extracted** from SSE response
5. **LLM router uses descriptions** to select best agent
6. **Selected agent processes** the request through its filter chain
## Example MCP Response
Your MCP server should respond to `/sse/tools/list` with:
```
data: {"tools": [{"name": "rewrite_query_with_archgw", "description": "Rewrites user queries using LLM for better retrieval", "inputSchema": {...}}]}
```
## Fallback Behavior
If MCP endpoint fails or returns empty description:
- System logs a warning
- Falls back to `description` field from arch_config.yaml
- Processing continues normally
## Logging
Enable debug logging to see MCP interactions:
```bash
RUST_LOG=debug cargo run
```
Look for logs like:
```
Agent rag_agent is an MCP agent, fetching tool description from: mcp://host.docker.internal:10501
Fetched MCP description for agent rag_agent: Rewrites user queries...
```
## Testing
Test your MCP endpoint manually:
```bash
# Check if endpoint is accessible
curl -H "Accept: text/event-stream" http://localhost:10500/sse/tools/list
# Expected response format
data: {"tools": [{"name": "my_tool", "description": "My tool description"}]}
```
## Troubleshooting
### "Failed to fetch MCP description"
- Check if MCP server is running
- Verify URL format is correct
- Ensure `/sse/tools/list` endpoint exists
- Check network connectivity
### "MCP tool description is empty"
- Verify MCP server returns tool in response
- Check tool name matches configuration
- Ensure `description` field is populated in MCP response
### "Tool not found"
- Verify tool name in config matches MCP server
- Check if tool is listed in `/sse/tools/list` response
- Try without explicit tool name (uses agent id)
## Best Practices
1. **Always provide fallback descriptions** in arch_config.yaml
2. **Use descriptive tool names** that match your config
3. **Keep MCP servers running** before starting Brightstaff
4. **Monitor logs** for MCP fetch failures
5. **Test MCP endpoints** independently before integration
## See Also
- [MCP Agent Integration Documentation](./MCP_AGENT_INTEGRATION.md)
- [RAG Agent Demo](../demos/use_cases/rag_agent/README.md)
- [Agent Configuration Reference](../demos/use_cases/rag_agent/arch_config.yaml)