diff --git a/surfsense_backend/app/agents/new_chat/tools/mcp_client.py b/surfsense_backend/app/agents/new_chat/tools/mcp_client.py index 437f93043..d4dbe2a0c 100644 --- a/surfsense_backend/app/agents/new_chat/tools/mcp_client.py +++ b/surfsense_backend/app/agents/new_chat/tools/mcp_client.py @@ -4,6 +4,7 @@ This module provides a client for communicating with MCP servers via stdio trans It handles server lifecycle management, tool discovery, and tool execution. """ +import asyncio import logging import os from contextlib import asynccontextmanager @@ -14,6 +15,11 @@ from mcp.client.stdio import StdioServerParameters, stdio_client logger = logging.getLogger(__name__) +# Retry configuration +MAX_RETRIES = 3 +RETRY_DELAY = 1.0 # seconds +RETRY_BACKOFF = 2.0 # exponential backoff multiplier + class MCPClient: """Client for communicating with an MCP server.""" @@ -35,44 +41,86 @@ class MCPClient: self.session: ClientSession | None = None @asynccontextmanager - async def connect(self): + async def connect(self, max_retries: int = MAX_RETRIES): """Connect to the MCP server and manage its lifecycle. + Args: + max_retries: Maximum number of connection retry attempts + Yields: ClientSession: Active MCP session for making requests + Raises: + RuntimeError: If all connection attempts fail + """ - try: - # Merge env vars with current environment - server_env = os.environ.copy() - server_env.update(self.env) + last_error = None + delay = RETRY_DELAY - # Create server parameters with env - server_params = StdioServerParameters( - command=self.command, args=self.args, env=server_env - ) + for attempt in range(max_retries): + try: + # Merge env vars with current environment + server_env = os.environ.copy() + server_env.update(self.env) - # Spawn server process and create session - # Note: Cannot combine these context managers because ClientSession - # needs the read/write streams from stdio_client - async with stdio_client(server=server_params) as (read, write): # noqa: SIM117 - async with ClientSession(read, write) as session: - # Initialize the connection - await session.initialize() - self.session = session - logger.info( - "Connected to MCP server: %s %s", - self.command, - " ".join(self.args), + # Create server parameters with env + server_params = StdioServerParameters( + command=self.command, args=self.args, env=server_env + ) + + # Spawn server process and create session + # Note: Cannot combine these context managers because ClientSession + # needs the read/write streams from stdio_client + async with stdio_client(server=server_params) as (read, write): # noqa: SIM117 + async with ClientSession(read, write) as session: + # Initialize the connection + await session.initialize() + self.session = session + + if attempt > 0: + logger.info( + "Connected to MCP server on attempt %d: %s %s", + attempt + 1, + self.command, + " ".join(self.args), + ) + else: + logger.info( + "Connected to MCP server: %s %s", + self.command, + " ".join(self.args), + ) + yield session + return # Success, exit retry loop + + except Exception as e: + last_error = e + if attempt < max_retries - 1: + logger.warning( + "MCP server connection failed (attempt %d/%d): %s. Retrying in %.1fs...", + attempt + 1, + max_retries, + e, + delay, ) - yield session + await asyncio.sleep(delay) + delay *= RETRY_BACKOFF # Exponential backoff + else: + logger.error( + "Failed to connect to MCP server after %d attempts: %s", + max_retries, + e, + exc_info=True, + ) + finally: + self.session = None - except Exception as e: - logger.error("Failed to connect to MCP server: %s", e, exc_info=True) - raise - finally: - self.session = None - logger.info("Disconnected from MCP server: %s", self.command) + # All retries exhausted + error_msg = f"Failed to connect to MCP server '{self.command}' after {max_retries} attempts" + if last_error: + error_msg += f": {last_error}" + logger.error(error_msg) + raise RuntimeError(error_msg) from last_error async def list_tools(self) -> list[dict[str, Any]]: """List all tools available from the MCP server. diff --git a/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py b/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py index 0e5f1b993..d7c9210af 100644 --- a/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py +++ b/surfsense_backend/app/agents/new_chat/tools/mcp_tool.py @@ -90,16 +90,22 @@ async def _create_mcp_tool_from_definition( input_model = _create_dynamic_input_model_from_schema(tool_name, input_schema) async def mcp_tool_call(**kwargs) -> str: - """Execute the MCP tool call via the client.""" + """Execute the MCP tool call via the client with retry support.""" logger.info(f"MCP tool '{tool_name}' called with params: {kwargs}") try: - # Connect to server and call tool + # Connect to server and call tool (connect has built-in retry logic) async with mcp_client.connect(): result = await mcp_client.call_tool(tool_name, kwargs) return str(result) + except RuntimeError as e: + # Connection failures after all retries + error_msg = f"MCP tool '{tool_name}' connection failed after retries: {e!s}" + logger.error(error_msg) + return f"Error: {error_msg}" except Exception as e: - error_msg = f"MCP tool '{tool_name}' failed: {e!s}" + # Tool execution or other errors + error_msg = f"MCP tool '{tool_name}' execution failed: {e!s}" logger.exception(error_msg) return f"Error: {error_msg}" @@ -146,17 +152,38 @@ async def load_mcp_tools( tools: list[StructuredTool] = [] for connector in result.scalars(): try: - # Extract server config + # Early validation: Extract and validate connector config config = connector.config or {} server_config = config.get("server_config", {}) - - command = server_config.get("command") - args = server_config.get("args", []) - env = server_config.get("env", {}) - - if not command: + + # Validate server_config exists and is a dict + if not server_config or not isinstance(server_config, dict): logger.warning( - f"MCP connector {connector.id} missing command, skipping" + f"MCP connector {connector.id} (name: '{connector.name}') has invalid or missing server_config, skipping" + ) + continue + + # Validate required command field + command = server_config.get("command") + if not command or not isinstance(command, str): + logger.warning( + f"MCP connector {connector.id} (name: '{connector.name}') missing or invalid command field, skipping" + ) + continue + + # Validate args field (must be list if present) + args = server_config.get("args", []) + if not isinstance(args, list): + logger.warning( + f"MCP connector {connector.id} (name: '{connector.name}') has invalid args field (must be list), skipping" + ) + continue + + # Validate env field (must be dict if present) + env = server_config.get("env", {}) + if not isinstance(env, dict): + logger.warning( + f"MCP connector {connector.id} (name: '{connector.name}') has invalid env field (must be dict), skipping" ) continue @@ -172,22 +199,21 @@ async def load_mcp_tools( f"'{command}' (connector {connector.id})" ) - # Create LangChain tools from definitions - for tool_def in tool_definitions: - try: - tool = await _create_mcp_tool_from_definition( - tool_def, mcp_client - ) - tools.append(tool) - except Exception as e: - logger.exception( - f"Failed to create tool '{tool_def.get('name')}' " - f"from connector {connector.id}: {e!s}", - ) - + # Create LangChain tools from definitions + for tool_def in tool_definitions: + try: + tool = await _create_mcp_tool_from_definition( + tool_def, mcp_client + ) + tools.append(tool) + except Exception as e: + logger.exception( + f"Failed to create tool '{tool_def.get('name')}' " + f"from connector {connector.id}: {e!s}" + ) except Exception as e: logger.exception( - f"Failed to load tools from MCP connector {connector.id}: {e!s}", + f"Failed to load tools from MCP connector {connector.id}: {e!s}" ) logger.info(f"Loaded {len(tools)} MCP tools for search space {search_space_id}") diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 9c6c2fc3f..6201452a9 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -2108,7 +2108,7 @@ async def create_mcp_connector( "You don't have permission to create connectors in this search space", ) - # Create the connector with server config + # Create the connector with single server config db_connector = SearchSourceConnector( name=connector_data.name, connector_type=SearchSourceConnectorType.MCP_CONNECTOR, @@ -2125,7 +2125,7 @@ async def create_mcp_connector( await session.refresh(db_connector) logger.info( - f"Created MCP connector {db_connector.id} for server '{connector_data.server_config.command}' " + f"Created MCP connector {db_connector.id} " f"for user {user.id} in search space {search_space_id}" ) diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py index 5fd7a5aab..b45645053 100644 --- a/surfsense_backend/app/schemas/search_source_connector.py +++ b/surfsense_backend/app/schemas/search_source_connector.py @@ -95,7 +95,7 @@ class MCPConnectorCreate(BaseModel): """Schema for creating an MCP connector.""" name: str - server_config: MCPServerConfig + server_config: MCPServerConfig # Single MCP server configuration class MCPConnectorUpdate(BaseModel): @@ -106,7 +106,7 @@ class MCPConnectorUpdate(BaseModel): class MCPConnectorRead(BaseModel): - """Schema for reading an MCP connector with server config.""" + """Schema for reading an MCP connector with server configs.""" id: int name: str @@ -123,7 +123,8 @@ class MCPConnectorRead(BaseModel): def from_connector(cls, connector: SearchSourceConnectorRead) -> "MCPConnectorRead": """Convert from base SearchSourceConnectorRead.""" config = connector.config or {} - server_config = MCPServerConfig(**config.get("server_config", {})) + server_config_data = config.get("server_config", {}) + server_config = MCPServerConfig(**server_config_data) return cls( id=connector.id, diff --git a/surfsense_web/components/assistant-ui/connector-popup.tsx b/surfsense_web/components/assistant-ui/connector-popup.tsx index cde32cabe..90113707e 100644 --- a/surfsense_web/components/assistant-ui/connector-popup.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup.tsx @@ -22,6 +22,7 @@ import { useIndexingConnectors } from "./connector-popup/hooks/use-indexing-conn import { ActiveConnectorsTab } from "./connector-popup/tabs/active-connectors-tab"; import { AllConnectorsTab } from "./connector-popup/tabs/all-connectors-tab"; import { ConnectorAccountsListView } from "./connector-popup/views/connector-accounts-list-view"; +import { MCPConnectorListView } from "./connector-popup/views/mcp-connector-list-view"; import { YouTubeCrawlerView } from "./connector-popup/views/youtube-crawler-view"; export const ConnectorIndicator: FC = () => { @@ -56,6 +57,7 @@ export const ConnectorIndicator: FC = () => { frequencyMinutes, allConnectors, viewingAccountsType, + viewingMCPList, setSearchQuery, setStartDate, setEndDate, @@ -79,6 +81,8 @@ export const ConnectorIndicator: FC = () => { handleBackFromYouTube, handleViewAccountsList, handleBackFromAccountsList, + handleBackFromMCPList, + handleAddNewMCPFromList, handleQuickIndexConnector, connectorConfig, setConnectorConfig, @@ -95,15 +99,15 @@ export const ConnectorIndicator: FC = () => { refreshConnectors: refreshConnectorsElectric, } = useConnectorsElectric(searchSpaceId); - // Fallback to API if Electric fails or is not available - const connectors = - connectorsFromElectric.length > 0 || !connectorsError - ? connectorsFromElectric - : allConnectors || []; + // Fallback to API if Electric is not available or fails + // Use Electric data if: 1) we have data, or 2) still loading without error + // Use API data if: Electric failed (has error) or finished loading with no data + const useElectricData = connectorsFromElectric.length > 0 || (connectorsLoading && !connectorsError); + const connectors = useElectricData ? connectorsFromElectric : allConnectors || []; // Manual refresh function that works with both Electric and API const refreshConnectors = async () => { - if (connectorsFromElectric.length > 0 || !connectorsError) { + if (useElectricData) { await refreshConnectorsElectric(); } else { // Fallback: use allConnectors from useConnectorDialog (which uses connectorsAtom) @@ -126,7 +130,8 @@ export const ConnectorIndicator: FC = () => { const hasConnectors = connectors.length > 0; const hasSources = hasConnectors || activeDocumentTypes.length > 0; const totalSourceCount = connectors.length + activeDocumentTypes.length; - const activeConnectorsCount = connectors.length; // Only actual connectors, not document types + + const activeConnectorsCount = connectors.length; // Check which connectors are already connected // Using Electric SQL + PGlite for real-time connector updates @@ -171,6 +176,19 @@ export const ConnectorIndicator: FC = () => { {/* YouTube Crawler View - shown when adding YouTube videos */} {isYouTubeView && searchSpaceId ? ( + ) : viewingMCPList ? ( +
+ c.connector_type === "MCP_CONNECTOR" + ) as SearchSourceConnector[] + } + onAddNew={handleAddNewMCPFromList} + onManageConnector={handleStartEdit} + onBack={handleBackFromMCPList} + /> +
) : viewingAccountsType ? ( { isSaving={isSaving} isDisconnecting={isDisconnecting} isIndexing={indexingConnectorIds.has(editingConnector.id)} + searchSpaceId={searchSpaceId?.toString()} + onStartDateChange={setStartDate} onEndDateChange={setEndDate} onPeriodicEnabledChange={setPeriodicEnabled} diff --git a/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx b/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx new file mode 100644 index 000000000..a671c91e8 --- /dev/null +++ b/surfsense_web/components/assistant-ui/connector-popup/connect-forms/components/mcp-connect-form.tsx @@ -0,0 +1,229 @@ +"use client"; + +import { CheckCircle2, ChevronDown, ChevronUp, Server, XCircle } from "lucide-react"; +import { type FC, useRef, useState } from "react"; +import { Alert, AlertDescription, AlertTitle } from "@/components/ui/alert"; +import { Button } from "@/components/ui/button"; +import { Input } from "@/components/ui/input"; +import { Label } from "@/components/ui/label"; +import { Textarea } from "@/components/ui/textarea"; +import { EnumConnectorName } from "@/contracts/enums/connector"; +import type { MCPToolDefinition } from "@/contracts/types/mcp.types"; +import type { ConnectFormProps } from ".."; +import { + extractServerName, + parseMCPConfig, + testMCPConnection, + type MCPConnectionTestResult, +} from "../../utils/mcp-config-validator"; + +export const MCPConnectForm: FC = ({ onSubmit, isSubmitting }) => { + const isSubmittingRef = useRef(false); + const [configJson, setConfigJson] = useState(""); + const [jsonError, setJsonError] = useState(null); + const [isTesting, setIsTesting] = useState(false); + const [showDetails, setShowDetails] = useState(false); + const [testResult, setTestResult] = useState(null); + + const DEFAULT_CONFIG = JSON.stringify( + { + name: "My MCP Server", + command: "npx", + args: ["-y", "@modelcontextprotocol/server-filesystem", "/path/to/directory"], + env: { + API_KEY: "your_api_key_here", + }, + transport: "stdio", + }, + null, + 2 + ); + + const parseConfig = () => { + const result = parseMCPConfig(configJson); + if (result.error) { + setJsonError(result.error); + } else { + setJsonError(null); + } + return result.config; + }; + + const handleConfigChange = (value: string) => { + setConfigJson(value); + + // Clear previous error + if (jsonError) { + setJsonError(null); + } + + // Validate immediately to show errors as user types (with debouncing via parseMCPConfig cache) + if (value.trim()) { + const result = parseMCPConfig(value); + if (result.error) { + setJsonError(result.error); + } + } + }; + + const handleTestConnection = async () => { + const serverConfig = parseConfig(); + if (!serverConfig) { + setTestResult({ + status: "error", + message: jsonError || "Invalid configuration", + tools: [], + }); + return; + } + + setIsTesting(true); + setTestResult(null); + + const result = await testMCPConnection(serverConfig); + setTestResult(result); + setIsTesting(false); + }; + + const handleSubmit = async (e: React.FormEvent) => { + e.preventDefault(); + + // Prevent multiple submissions + if (isSubmittingRef.current || isSubmitting) { + return; + } + + const serverConfig = parseConfig(); + if (!serverConfig) { + return; + } + + // Extract server name from config if provided + const serverName = extractServerName(configJson); + + isSubmittingRef.current = true; + try { + await onSubmit({ + name: serverName, + connector_type: EnumConnectorName.MCP_CONNECTOR, + config: { server_config: serverConfig }, + is_indexable: false, + is_active: true, + last_indexed_at: null, + periodic_indexing_enabled: false, + indexing_frequency_minutes: null, + next_scheduled_at: null, + }); + } finally { + isSubmittingRef.current = false; + } + }; + + return ( +
+ + + + Connect to an MCP (Model Context Protocol) server. Each MCP server is added as a separate connector. + + + +
+
+
+ +