diff --git a/surfsense_backend/app/utils/validators.py b/surfsense_backend/app/utils/validators.py index 58e13df39..32b406b9d 100644 --- a/surfsense_backend/app/utils/validators.py +++ b/surfsense_backend/app/utils/validators.py @@ -62,19 +62,14 @@ def validate_search_space_id(search_space_id: Any) -> int: detail="search_space_id must be a valid positive integer" ) - try: - value = int(search_space_id.strip()) - if value <= 0: - raise HTTPException( - status_code=400, - detail="search_space_id must be a positive integer" - ) - return value - except ValueError: + value = int(search_space_id.strip()) + # Regex already guarantees value > 0, but check retained for clarity + if value <= 0: raise HTTPException( status_code=400, - detail="search_space_id must be a valid integer" - ) from None + detail="search_space_id must be a positive integer" + ) + return value raise HTTPException( status_code=400, @@ -132,19 +127,14 @@ def validate_document_ids(document_ids: Any) -> list[int]: detail=f"document_ids_to_add_in_context[{i}] must be a valid positive integer" ) - try: - value = int(doc_id.strip()) - if value <= 0: - raise HTTPException( - status_code=400, - detail=f"document_ids_to_add_in_context[{i}] must be a positive integer" - ) - validated_ids.append(value) - except ValueError: + value = int(doc_id.strip()) + # Regex already guarantees value > 0 + if value <= 0: raise HTTPException( status_code=400, - detail=f"document_ids_to_add_in_context[{i}] must be a valid integer" - ) from None + detail=f"document_ids_to_add_in_context[{i}] must be a positive integer" + ) + validated_ids.append(value) else: raise HTTPException( status_code=400, @@ -340,7 +330,7 @@ def validate_messages(messages: Any) -> list[dict]: detail=f"messages[{i}].content cannot be empty" ) - # Basic content sanitization + # Trim content and enforce max length (10,000 chars) sanitized_content = content.strip() if len(sanitized_content) > 10000: # Reasonable limit raise HTTPException( @@ -466,221 +456,117 @@ def validate_connector_config(connector_type: str | Any, config: dict[str, Any]) # Convert enum to string if needed connector_type_str = str(connector_type).split('.')[-1] if hasattr(connector_type, 'value') else str(connector_type) - if connector_type_str == "SERPER_API": - # For SERPER_API, only allow SERPER_API_KEY - allowed_keys = ["SERPER_API_KEY"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For SERPER_API connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API key is not empty - if not config.get("SERPER_API_KEY"): - raise ValueError("SERPER_API_KEY cannot be empty") - - elif connector_type_str == "TAVILY_API": - # For TAVILY_API, only allow TAVILY_API_KEY - allowed_keys = ["TAVILY_API_KEY"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For TAVILY_API connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API key is not empty - if not config.get("TAVILY_API_KEY"): - raise ValueError("TAVILY_API_KEY cannot be empty") - - elif connector_type_str == "LINKUP_API": - # For LINKUP_API, only allow LINKUP_API_KEY - allowed_keys = ["LINKUP_API_KEY"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For LINKUP_API connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API key is not empty - if not config.get("LINKUP_API_KEY"): - raise ValueError("LINKUP_API_KEY cannot be empty") - - elif connector_type_str == "SLACK_CONNECTOR": - # For SLACK_CONNECTOR, only allow SLACK_BOT_TOKEN - allowed_keys = ["SLACK_BOT_TOKEN"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For SLACK_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the bot token is not empty - if not config.get("SLACK_BOT_TOKEN"): - raise ValueError("SLACK_BOT_TOKEN cannot be empty") - - elif connector_type_str == "NOTION_CONNECTOR": - # For NOTION_CONNECTOR, only allow NOTION_INTEGRATION_TOKEN - allowed_keys = ["NOTION_INTEGRATION_TOKEN"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For NOTION_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the integration token is not empty - if not config.get("NOTION_INTEGRATION_TOKEN"): - raise ValueError("NOTION_INTEGRATION_TOKEN cannot be empty") - - elif connector_type_str == "GITHUB_CONNECTOR": - # For GITHUB_CONNECTOR, only allow GITHUB_PAT and repo_full_names - allowed_keys = ["GITHUB_PAT", "repo_full_names"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For GITHUB_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the token is not empty - if not config.get("GITHUB_PAT"): - raise ValueError("GITHUB_PAT cannot be empty") - - # Ensure the repo_full_names is present and is a non-empty list - repo_full_names = config.get("repo_full_names") - if not isinstance(repo_full_names, list) or not repo_full_names: - raise ValueError("repo_full_names must be a non-empty list of strings") - - elif connector_type_str == "LINEAR_CONNECTOR": - # For LINEAR_CONNECTOR, only allow LINEAR_API_KEY - allowed_keys = ["LINEAR_API_KEY"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For LINEAR_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the token is not empty - if not config.get("LINEAR_API_KEY"): - raise ValueError("LINEAR_API_KEY cannot be empty") - - elif connector_type_str == "DISCORD_CONNECTOR": - # For DISCORD_CONNECTOR, only allow DISCORD_BOT_TOKEN - allowed_keys = ["DISCORD_BOT_TOKEN"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For DISCORD_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the bot token is not empty - if not config.get("DISCORD_BOT_TOKEN"): - raise ValueError("DISCORD_BOT_TOKEN cannot be empty") - - elif connector_type_str == "JIRA_CONNECTOR": - # For JIRA_CONNECTOR, require JIRA_EMAIL, JIRA_API_TOKEN and JIRA_BASE_URL - allowed_keys = ["JIRA_EMAIL", "JIRA_API_TOKEN", "JIRA_BASE_URL"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For JIRA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Validate email format - if not validators.email(config.get("JIRA_EMAIL", "")): - raise ValueError("Invalid email format for JIRA connector") - - # Validate URL format - if not validators.url(config.get("JIRA_BASE_URL", "")): - raise ValueError("Invalid base URL format for JIRA connector") - - # Ensure the email is not empty - if not config.get("JIRA_EMAIL"): - raise ValueError("JIRA_EMAIL cannot be empty") - - # Ensure the API token is not empty - if not config.get("JIRA_API_TOKEN"): - raise ValueError("JIRA_API_TOKEN cannot be empty") - - # Ensure the base URL is not empty - if not config.get("JIRA_BASE_URL"): - raise ValueError("JIRA_BASE_URL cannot be empty") - - elif connector_type_str == "CONFLUENCE_CONNECTOR": - # For CONFLUENCE_CONNECTOR, only allow specific keys - allowed_keys = [ - "CONFLUENCE_BASE_URL", - "CONFLUENCE_EMAIL", - "CONFLUENCE_API_TOKEN", - ] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For CONFLUENCE_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Validate email format - if not validators.email(config.get("CONFLUENCE_EMAIL", "")): - raise ValueError("Invalid email format for Confluence connector") - - # Validate URL format - if not validators.url(config.get("CONFLUENCE_BASE_URL", "")): - raise ValueError("Invalid base URL format for Confluence connector") - - # Ensure the email is not empty - if not config.get("CONFLUENCE_EMAIL"): - raise ValueError("CONFLUENCE_EMAIL cannot be empty") - - # Ensure the API token is not empty - if not config.get("CONFLUENCE_API_TOKEN"): - raise ValueError("CONFLUENCE_API_TOKEN cannot be empty") - - # Ensure the base URL is not empty - if not config.get("CONFLUENCE_BASE_URL"): - raise ValueError("CONFLUENCE_BASE_URL cannot be empty") - - elif connector_type_str == "CLICKUP_CONNECTOR": - # For CLICKUP_CONNECTOR, only allow CLICKUP_API_TOKEN - allowed_keys = ["CLICKUP_API_TOKEN"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For CLICKUP_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API token is not empty - if not config.get("CLICKUP_API_TOKEN"): - raise ValueError("CLICKUP_API_TOKEN cannot be empty") - - elif connector_type_str == "GOOGLE_CALENDAR_CONNECTOR": - # Required fields for Google Calendar connector - required_keys = ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"] - - for key in required_keys: + # Validation function helpers + def validate_email_field(key: str, connector_name: str) -> None: + if not validators.email(config.get(key, "")): + raise ValueError(f"Invalid email format for {connector_name} connector") + + def validate_url_field(key: str, connector_name: str) -> None: + if not validators.url(config.get(key, "")): + raise ValueError(f"Invalid base URL format for {connector_name} connector") + + def validate_list_field(key: str, field_name: str) -> None: + value = config.get(key) + if not isinstance(value, list) or not value: + raise ValueError(f"{field_name} must be a non-empty list of strings") + + # Lookup table for connector validation rules + connector_rules = { + "SERPER_API": { + "required": ["SERPER_API_KEY"], + "validators": {} + }, + "TAVILY_API": { + "required": ["TAVILY_API_KEY"], + "validators": {} + }, + "LINKUP_API": { + "required": ["LINKUP_API_KEY"], + "validators": {} + }, + "SLACK_CONNECTOR": { + "required": ["SLACK_BOT_TOKEN"], + "validators": {} + }, + "NOTION_CONNECTOR": { + "required": ["NOTION_INTEGRATION_TOKEN"], + "validators": {} + }, + "GITHUB_CONNECTOR": { + "required": ["GITHUB_PAT", "repo_full_names"], + "validators": { + "repo_full_names": lambda: validate_list_field("repo_full_names", "repo_full_names") + } + }, + "LINEAR_CONNECTOR": { + "required": ["LINEAR_API_KEY"], + "validators": {} + }, + "DISCORD_CONNECTOR": { + "required": ["DISCORD_BOT_TOKEN"], + "validators": {} + }, + "JIRA_CONNECTOR": { + "required": ["JIRA_EMAIL", "JIRA_API_TOKEN", "JIRA_BASE_URL"], + "validators": { + "JIRA_EMAIL": lambda: validate_email_field("JIRA_EMAIL", "JIRA"), + "JIRA_BASE_URL": lambda: validate_url_field("JIRA_BASE_URL", "JIRA") + } + }, + "CONFLUENCE_CONNECTOR": { + "required": ["CONFLUENCE_BASE_URL", "CONFLUENCE_EMAIL", "CONFLUENCE_API_TOKEN"], + "validators": { + "CONFLUENCE_EMAIL": lambda: validate_email_field("CONFLUENCE_EMAIL", "Confluence"), + "CONFLUENCE_BASE_URL": lambda: validate_url_field("CONFLUENCE_BASE_URL", "Confluence") + } + }, + "CLICKUP_CONNECTOR": { + "required": ["CLICKUP_API_TOKEN"], + "validators": {} + }, + "GOOGLE_CALENDAR_CONNECTOR": { + "required": ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"], + "validators": {}, + "allow_none_or_empty": False # Special flag for Google connectors + }, + "GOOGLE_GMAIL_CONNECTOR": { + "required": ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"], + "validators": {}, + "allow_none_or_empty": False + }, + "AIRTABLE_CONNECTOR": { + "required": ["AIRTABLE_API_KEY", "AIRTABLE_BASE_ID"], + "validators": {} + }, + "LUMA_CONNECTOR": { + "required": ["LUMA_API_KEY"], + "validators": {} + } + } + + rules = connector_rules.get(connector_type_str) + if not rules: + return config # Unknown connector type, pass through + + # Validate required keys match exactly + if set(config.keys()) != set(rules["required"]): + raise ValueError( + f"For {connector_type_str} connector type, config must only contain these keys: {rules['required']}" + ) + + # Apply custom validators first (these check format before emptiness) + for validator_func in rules["validators"].values(): + validator_func() + + # Validate each field is not empty + for key in rules["required"]: + # Special handling for Google connectors that don't allow None or empty strings + if rules.get("allow_none_or_empty") is False: if key not in config or config[key] in (None, ""): raise ValueError(f"{key} is required and cannot be empty") - - elif connector_type_str == "GOOGLE_GMAIL_CONNECTOR": - # Required fields for Gmail connector (same as Calendar - uses Google OAuth) - required_keys = ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"] - - for key in required_keys: - if key not in config or config[key] in (None, ""): - raise ValueError(f"{key} is required and cannot be empty") - - elif connector_type_str == "AIRTABLE_CONNECTOR": - # For AIRTABLE_CONNECTOR, only allow AIRTABLE_API_KEY and AIRTABLE_BASE_ID - allowed_keys = ["AIRTABLE_API_KEY", "AIRTABLE_BASE_ID"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For AIRTABLE_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API key is not empty - if not config.get("AIRTABLE_API_KEY"): - raise ValueError("AIRTABLE_API_KEY cannot be empty") - - # Ensure the base ID is not empty - if not config.get("AIRTABLE_BASE_ID"): - raise ValueError("AIRTABLE_BASE_ID cannot be empty") - - elif connector_type_str == "LUMA_CONNECTOR": - # For LUMA_CONNECTOR, only allow LUMA_API_KEY - allowed_keys = ["LUMA_API_KEY"] - if set(config.keys()) != set(allowed_keys): - raise ValueError( - f"For LUMA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}" - ) - - # Ensure the API key is not empty - if not config.get("LUMA_API_KEY"): - raise ValueError("LUMA_API_KEY cannot be empty") - + else: + # Standard check: field must have a truthy value + if not config.get(key): + raise ValueError(f"{key} cannot be empty") + return config