Merge pull request #370 from vaishcodescape/aditya/features

fix:chat-routes backend security vulnerability fixed
This commit is contained in:
Rohan Verma 2025-10-08 21:28:39 -07:00 committed by GitHub
commit 6d53b0c8f7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 614 additions and 205 deletions

View file

@ -16,40 +16,43 @@ from app.schemas import (
from app.tasks.stream_connector_search_results import stream_connector_search_results from app.tasks.stream_connector_search_results import stream_connector_search_results
from app.users import current_active_user from app.users import current_active_user
from app.utils.check_ownership import check_ownership from app.utils.check_ownership import check_ownership
from app.utils.validators import (
validate_search_space_id,
validate_document_ids,
validate_connectors,
validate_research_mode,
validate_search_mode,
validate_messages,
)
router = APIRouter() router = APIRouter()
@router.post("/chat") @router.post("/chat")
async def handle_chat_data( async def handle_chat_data(
request: AISDKChatRequest, request: AISDKChatRequest,
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
): ):
messages = request.messages # Validate and sanitize all input data
messages = validate_messages(request.messages)
if messages[-1]["role"] != "user": if messages[-1]["role"] != "user":
raise HTTPException( raise HTTPException(
status_code=400, detail="Last message must be a user message" status_code=400, detail="Last message must be a user message"
) )
user_query = messages[-1]["content"] user_query = messages[-1]["content"]
search_space_id = request.data.get("search_space_id")
research_mode: str = request.data.get("research_mode") # Extract and validate data from request
selected_connectors: list[str] = request.data.get("selected_connectors") request_data = request.data or {}
document_ids_to_add_in_context: list[int] = request.data.get( search_space_id = validate_search_space_id(request_data.get("search_space_id"))
"document_ids_to_add_in_context" research_mode = validate_research_mode(request_data.get("research_mode"))
) selected_connectors = validate_connectors(request_data.get("selected_connectors"))
document_ids_to_add_in_context = validate_document_ids(request_data.get("document_ids_to_add_in_context"))
search_mode_str = request.data.get("search_mode", "CHUNKS") search_mode_str = validate_search_mode(request_data.get("search_mode"))
# Convert search_space_id to integer if it's a string
if search_space_id and isinstance(search_space_id, str):
try:
search_space_id = int(search_space_id)
except ValueError:
raise HTTPException(
status_code=400, detail="Invalid search_space_id format"
) from None
# Check if the search space belongs to the current user # Check if the search space belongs to the current user
try: try:
@ -126,6 +129,25 @@ async def read_chats(
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user), user: User = Depends(current_active_user),
): ):
# Validate pagination parameters
if skip < 0:
raise HTTPException(
status_code=400,
detail="skip must be a non-negative integer"
)
if limit <= 0 or limit > 1000: # Reasonable upper limit
raise HTTPException(
status_code=400,
detail="limit must be between 1 and 1000"
)
# Validate search_space_id if provided
if search_space_id is not None and search_space_id <= 0:
raise HTTPException(
status_code=400,
detail="search_space_id must be a positive integer"
)
try: try:
# Select specific fields excluding messages # Select specific fields excluding messages
query = ( query = (

View file

@ -5,7 +5,7 @@ from typing import Any
from pydantic import BaseModel, ConfigDict, field_validator from pydantic import BaseModel, ConfigDict, field_validator
from app.db import SearchSourceConnectorType from app.db import SearchSourceConnectorType
from app.schemas.google_auth_credentials import GoogleAuthCredentialsBase from app.utils.validators import validate_connector_config
from .base import IDModel, TimestampModel from .base import IDModel, TimestampModel
@ -23,192 +23,7 @@ class SearchSourceConnectorBase(BaseModel):
cls, config: dict[str, Any], values: dict[str, Any] cls, config: dict[str, Any], values: dict[str, Any]
) -> dict[str, Any]: ) -> dict[str, Any]:
connector_type = values.data.get("connector_type") connector_type = values.data.get("connector_type")
return validate_connector_config(connector_type, config)
if connector_type == SearchSourceConnectorType.SERPER_API:
# For SERPER_API, only allow SERPER_API_KEY
allowed_keys = ["SERPER_API_KEY"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For SERPER_API connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the API key is not empty
if not config.get("SERPER_API_KEY"):
raise ValueError("SERPER_API_KEY cannot be empty")
elif connector_type == SearchSourceConnectorType.TAVILY_API:
# For TAVILY_API, only allow TAVILY_API_KEY
allowed_keys = ["TAVILY_API_KEY"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For TAVILY_API connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the API key is not empty
if not config.get("TAVILY_API_KEY"):
raise ValueError("TAVILY_API_KEY cannot be empty")
elif connector_type == SearchSourceConnectorType.LINKUP_API:
# For LINKUP_API, only allow LINKUP_API_KEY
allowed_keys = ["LINKUP_API_KEY"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For LINKUP_API connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the API key is not empty
if not config.get("LINKUP_API_KEY"):
raise ValueError("LINKUP_API_KEY cannot be empty")
elif connector_type == SearchSourceConnectorType.SLACK_CONNECTOR:
# For SLACK_CONNECTOR, only allow SLACK_BOT_TOKEN
allowed_keys = ["SLACK_BOT_TOKEN"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For SLACK_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the bot token is not empty
if not config.get("SLACK_BOT_TOKEN"):
raise ValueError("SLACK_BOT_TOKEN cannot be empty")
elif connector_type == SearchSourceConnectorType.NOTION_CONNECTOR:
# For NOTION_CONNECTOR, only allow NOTION_INTEGRATION_TOKEN
allowed_keys = ["NOTION_INTEGRATION_TOKEN"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For NOTION_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the integration token is not empty
if not config.get("NOTION_INTEGRATION_TOKEN"):
raise ValueError("NOTION_INTEGRATION_TOKEN cannot be empty")
elif connector_type == SearchSourceConnectorType.GITHUB_CONNECTOR:
# For GITHUB_CONNECTOR, only allow GITHUB_PAT and repo_full_names
allowed_keys = ["GITHUB_PAT", "repo_full_names"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For GITHUB_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the token is not empty
if not config.get("GITHUB_PAT"):
raise ValueError("GITHUB_PAT cannot be empty")
# Ensure the repo_full_names is present and is a non-empty list
repo_full_names = config.get("repo_full_names")
if not isinstance(repo_full_names, list) or not repo_full_names:
raise ValueError("repo_full_names must be a non-empty list of strings")
elif connector_type == SearchSourceConnectorType.LINEAR_CONNECTOR:
# For LINEAR_CONNECTOR, only allow LINEAR_API_KEY
allowed_keys = ["LINEAR_API_KEY"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For LINEAR_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the token is not empty
if not config.get("LINEAR_API_KEY"):
raise ValueError("LINEAR_API_KEY cannot be empty")
elif connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
# For DISCORD_CONNECTOR, only allow DISCORD_BOT_TOKEN
allowed_keys = ["DISCORD_BOT_TOKEN"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For DISCORD_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the bot token is not empty
if not config.get("DISCORD_BOT_TOKEN"):
raise ValueError("DISCORD_BOT_TOKEN cannot be empty")
elif connector_type == SearchSourceConnectorType.JIRA_CONNECTOR:
# For JIRA_CONNECTOR, require JIRA_EMAIL, JIRA_API_TOKEN and JIRA_BASE_URL
allowed_keys = ["JIRA_EMAIL", "JIRA_API_TOKEN", "JIRA_BASE_URL"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For JIRA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the email is not empty
if not config.get("JIRA_EMAIL"):
raise ValueError("JIRA_EMAIL cannot be empty")
# Ensure the API token is not empty
if not config.get("JIRA_API_TOKEN"):
raise ValueError("JIRA_API_TOKEN cannot be empty")
# Ensure the base URL is not empty
if not config.get("JIRA_BASE_URL"):
raise ValueError("JIRA_BASE_URL cannot be empty")
elif connector_type == SearchSourceConnectorType.CONFLUENCE_CONNECTOR:
# For CONFLUENCE_CONNECTOR, only allow specific keys
allowed_keys = [
"CONFLUENCE_BASE_URL",
"CONFLUENCE_EMAIL",
"CONFLUENCE_API_TOKEN",
]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For CONFLUENCE_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the email is not empty
if not config.get("CONFLUENCE_EMAIL"):
raise ValueError("CONFLUENCE_EMAIL cannot be empty")
# Ensure the API token is not empty
if not config.get("CONFLUENCE_API_TOKEN"):
raise ValueError("CONFLUENCE_API_TOKEN cannot be empty")
# Ensure the base URL is not empty
if not config.get("CONFLUENCE_BASE_URL"):
raise ValueError("CONFLUENCE_BASE_URL cannot be empty")
elif connector_type == SearchSourceConnectorType.CLICKUP_CONNECTOR:
# For CLICKUP_CONNECTOR, only allow CLICKUP_API_TOKEN
allowed_keys = ["CLICKUP_API_TOKEN"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For CLICKUP_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the API token is not empty
if not config.get("CLICKUP_API_TOKEN"):
raise ValueError("CLICKUP_API_TOKEN cannot be empty")
elif connector_type == SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR:
# Required fields
required_keys = list(GoogleAuthCredentialsBase.model_fields.keys())
for key in required_keys:
if key not in config or config[key] in (None, ""):
raise ValueError(f"{key} is required and cannot be empty")
elif connector_type == SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR:
# Required fields for Gmail connector (same as Calendar - uses Google OAuth)
required_keys = list(GoogleAuthCredentialsBase.model_fields.keys())
for key in required_keys:
if key not in config or config[key] in (None, ""):
raise ValueError(f"{key} is required and cannot be empty")
elif connector_type == SearchSourceConnectorType.LUMA_CONNECTOR:
# For LUMA_CONNECTOR, only allow LUMA_API_KEY
allowed_keys = ["LUMA_API_KEY"]
if set(config.keys()) != set(allowed_keys):
raise ValueError(
f"For LUMA_CONNECTOR connector type, config must only contain these keys: {allowed_keys}"
)
# Ensure the api key is not empty
if not config.get("LUMA_API_KEY"):
raise ValueError("LUMA_API_KEY cannot be empty")
return config
class SearchSourceConnectorCreate(SearchSourceConnectorBase): class SearchSourceConnectorCreate(SearchSourceConnectorBase):

View file

@ -0,0 +1,572 @@
"""
Validation utilities for SurfSense backend.
This module contains validation functions that were previously scattered across
different modules. It leverages the pyvalidators library where applicable
to avoid rewriting common validation logic.
"""
import re
from typing import Any
import validators
from fastapi import HTTPException
def validate_search_space_id(search_space_id: Any) -> int:
"""
Validate and convert search_space_id to integer.
Args:
search_space_id: The search space ID to validate
Returns:
int: Validated search space ID
Raises:
HTTPException: If validation fails
"""
if search_space_id is None:
raise HTTPException(
status_code=400,
detail="search_space_id is required"
)
if isinstance(search_space_id, bool):
raise HTTPException(
status_code=400,
detail="search_space_id must be an integer, not a boolean"
)
if isinstance(search_space_id, int):
if search_space_id <= 0:
raise HTTPException(
status_code=400,
detail="search_space_id must be a positive integer"
)
return search_space_id
if isinstance(search_space_id, str):
# Check if it's a valid integer string
if not search_space_id.strip():
raise HTTPException(
status_code=400,
detail="search_space_id cannot be empty"
)
# Check for valid integer format (no leading zeros, no decimal points)
if not re.match(r'^[1-9]\d*$', search_space_id.strip()):
raise HTTPException(
status_code=400,
detail="search_space_id must be a valid positive integer"
)
value = int(search_space_id.strip())
# Regex already guarantees value > 0, but check retained for clarity
if value <= 0:
raise HTTPException(
status_code=400,
detail="search_space_id must be a positive integer"
)
return value
raise HTTPException(
status_code=400,
detail="search_space_id must be an integer or string representation of an integer"
)
def validate_document_ids(document_ids: Any) -> list[int]:
"""
Validate and convert document_ids to list of integers.
Args:
document_ids: The document IDs to validate
Returns:
List[int]: Validated list of document IDs
Raises:
HTTPException: If validation fails
"""
if document_ids is None:
return []
if not isinstance(document_ids, list):
raise HTTPException(
status_code=400,
detail="document_ids_to_add_in_context must be a list"
)
validated_ids = []
for i, doc_id in enumerate(document_ids):
if isinstance(doc_id, bool):
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] must be an integer, not a boolean",
)
if isinstance(doc_id, int):
if doc_id <= 0:
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] must be a positive integer"
)
validated_ids.append(doc_id)
elif isinstance(doc_id, str):
if not doc_id.strip():
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] cannot be empty"
)
if not re.match(r'^[1-9]\d*$', doc_id.strip()):
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] must be a valid positive integer"
)
value = int(doc_id.strip())
# Regex already guarantees value > 0
if value <= 0:
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] must be a positive integer"
)
validated_ids.append(value)
else:
raise HTTPException(
status_code=400,
detail=f"document_ids_to_add_in_context[{i}] must be an integer or string representation of an integer"
)
return validated_ids
def validate_connectors(connectors: Any) -> list[str]:
"""
Validate selected_connectors list.
Args:
connectors: The connectors to validate
Returns:
List[str]: Validated list of connector names
Raises:
HTTPException: If validation fails
"""
if connectors is None:
return []
if not isinstance(connectors, list):
raise HTTPException(
status_code=400,
detail="selected_connectors must be a list"
)
validated_connectors = []
for i, connector in enumerate(connectors):
if not isinstance(connector, str):
raise HTTPException(
status_code=400,
detail=f"selected_connectors[{i}] must be a string"
)
if not connector.strip():
raise HTTPException(
status_code=400,
detail=f"selected_connectors[{i}] cannot be empty"
)
trimmed = connector.strip()
if not re.fullmatch(r'[\w\-_]+', trimmed):
raise HTTPException(
status_code=400,
detail=f"selected_connectors[{i}] contains invalid characters"
)
validated_connectors.append(trimmed)
return validated_connectors
def validate_research_mode(research_mode: Any) -> str:
"""
Validate research_mode parameter.
Args:
research_mode: The research mode to validate
Returns:
str: Validated research mode
Raises:
HTTPException: If validation fails
"""
if research_mode is None:
return "QNA" # Default value
if not isinstance(research_mode, str):
raise HTTPException(
status_code=400,
detail="research_mode must be a string"
)
normalized_mode = research_mode.strip().upper()
if not normalized_mode:
raise HTTPException(
status_code=400,
detail="research_mode cannot be empty"
)
valid_modes = ["REPORT_GENERAL", "REPORT_DEEP", "REPORT_DEEPER", "QNA"]
if normalized_mode not in valid_modes:
raise HTTPException(
status_code=400,
detail=f"research_mode must be one of: {', '.join(valid_modes)}"
)
return normalized_mode
def validate_search_mode(search_mode: Any) -> str:
"""
Validate search_mode parameter.
Args:
search_mode: The search mode to validate
Returns:
str: Validated search mode
Raises:
HTTPException: If validation fails
"""
if search_mode is None:
return "CHUNKS" # Default value
if not isinstance(search_mode, str):
raise HTTPException(
status_code=400,
detail="search_mode must be a string"
)
normalized_mode = search_mode.strip().upper()
if not normalized_mode:
raise HTTPException(
status_code=400,
detail="search_mode cannot be empty"
)
valid_modes = ["CHUNKS", "DOCUMENTS"]
if normalized_mode not in valid_modes:
raise HTTPException(
status_code=400,
detail=f"search_mode must be one of: {', '.join(valid_modes)}"
)
return normalized_mode
def validate_messages(messages: Any) -> list[dict]:
"""
Validate messages structure.
Args:
messages: The messages to validate
Returns:
List[dict]: Validated messages
Raises:
HTTPException: If validation fails
"""
if not isinstance(messages, list):
raise HTTPException(
status_code=400,
detail="messages must be a list"
)
if not messages:
raise HTTPException(
status_code=400,
detail="messages cannot be empty"
)
validated_messages = []
for i, message in enumerate(messages):
if not isinstance(message, dict):
raise HTTPException(
status_code=400,
detail=f"messages[{i}] must be a dictionary"
)
if "role" not in message:
raise HTTPException(
status_code=400,
detail=f"messages[{i}] must have a 'role' field"
)
if "content" not in message:
raise HTTPException(
status_code=400,
detail=f"messages[{i}] must have a 'content' field"
)
role = message["role"]
if not isinstance(role, str) or role not in ["user", "assistant", "system"]:
raise HTTPException(
status_code=400,
detail=f"messages[{i}].role must be 'user', 'assistant', or 'system'"
)
content = message["content"]
if not isinstance(content, str):
raise HTTPException(
status_code=400,
detail=f"messages[{i}].content must be a string"
)
if not content.strip():
raise HTTPException(
status_code=400,
detail=f"messages[{i}].content cannot be empty"
)
# Trim content and enforce max length (10,000 chars)
sanitized_content = content.strip()
if len(sanitized_content) > 10000: # Reasonable limit
raise HTTPException(
status_code=400,
detail=f"messages[{i}].content is too long (max 10000 characters)"
)
validated_messages.append({
"role": role,
"content": sanitized_content
})
return validated_messages
def validate_email(email: str) -> str:
"""
Validate email address using pyvalidators library.
Args:
email: The email address to validate
Returns:
str: Validated email address
Raises:
HTTPException: If validation fails
"""
if not email or not email.strip():
raise HTTPException(
status_code=400,
detail="Email address is required"
)
email = email.strip()
if not validators.email(email):
raise HTTPException(
status_code=400,
detail="Invalid email address format"
)
return email
def validate_url(url: str) -> str:
"""
Validate URL using pyvalidators library.
Args:
url: The URL to validate
Returns:
str: Validated URL
Raises:
HTTPException: If validation fails
"""
if not url or not url.strip():
raise HTTPException(
status_code=400,
detail="URL is required"
)
url = url.strip()
if not validators.url(url):
raise HTTPException(
status_code=400,
detail="Invalid URL format"
)
return url
def validate_uuid(uuid_string: str) -> str:
"""
Validate UUID using pyvalidators library.
Args:
uuid_string: The UUID string to validate
Returns:
str: Validated UUID string
Raises:
HTTPException: If validation fails
"""
if not uuid_string or not uuid_string.strip():
raise HTTPException(
status_code=400,
detail="UUID is required"
)
uuid_string = uuid_string.strip()
if not validators.uuid(uuid_string):
raise HTTPException(
status_code=400,
detail="Invalid UUID format"
)
return uuid_string
def validate_connector_config(connector_type: str | Any, config: dict[str, Any]) -> dict[str, Any]:
"""
Validate connector configuration based on connector type.
Args:
connector_type: The type of connector (string or enum)
config: The configuration dictionary to validate
Returns:
dict: Validated configuration
Raises:
ValueError: If validation fails
"""
if not isinstance(config, dict) or isinstance(config, bool):
raise ValueError("config must be a dictionary of connector settings")
# Convert enum to string if needed
connector_type_str = str(connector_type).split('.')[-1] if hasattr(connector_type, 'value') else str(connector_type)
# Validation function helpers
def validate_email_field(key: str, connector_name: str) -> None:
if not validators.email(config.get(key, "")):
raise ValueError(f"Invalid email format for {connector_name} connector")
def validate_url_field(key: str, connector_name: str) -> None:
if not validators.url(config.get(key, "")):
raise ValueError(f"Invalid base URL format for {connector_name} connector")
def validate_list_field(key: str, field_name: str) -> None:
value = config.get(key)
if not isinstance(value, list) or not value:
raise ValueError(f"{field_name} must be a non-empty list of strings")
# Lookup table for connector validation rules
connector_rules = {
"SERPER_API": {
"required": ["SERPER_API_KEY"],
"validators": {}
},
"TAVILY_API": {
"required": ["TAVILY_API_KEY"],
"validators": {}
},
"LINKUP_API": {
"required": ["LINKUP_API_KEY"],
"validators": {}
},
"SLACK_CONNECTOR": {
"required": ["SLACK_BOT_TOKEN"],
"validators": {}
},
"NOTION_CONNECTOR": {
"required": ["NOTION_INTEGRATION_TOKEN"],
"validators": {}
},
"GITHUB_CONNECTOR": {
"required": ["GITHUB_PAT", "repo_full_names"],
"validators": {
"repo_full_names": lambda: validate_list_field("repo_full_names", "repo_full_names")
}
},
"LINEAR_CONNECTOR": {
"required": ["LINEAR_API_KEY"],
"validators": {}
},
"DISCORD_CONNECTOR": {
"required": ["DISCORD_BOT_TOKEN"],
"validators": {}
},
"JIRA_CONNECTOR": {
"required": ["JIRA_EMAIL", "JIRA_API_TOKEN", "JIRA_BASE_URL"],
"validators": {
"JIRA_EMAIL": lambda: validate_email_field("JIRA_EMAIL", "JIRA"),
"JIRA_BASE_URL": lambda: validate_url_field("JIRA_BASE_URL", "JIRA")
}
},
"CONFLUENCE_CONNECTOR": {
"required": ["CONFLUENCE_BASE_URL", "CONFLUENCE_EMAIL", "CONFLUENCE_API_TOKEN"],
"validators": {
"CONFLUENCE_EMAIL": lambda: validate_email_field("CONFLUENCE_EMAIL", "Confluence"),
"CONFLUENCE_BASE_URL": lambda: validate_url_field("CONFLUENCE_BASE_URL", "Confluence")
}
},
"CLICKUP_CONNECTOR": {
"required": ["CLICKUP_API_TOKEN"],
"validators": {}
},
"GOOGLE_CALENDAR_CONNECTOR": {
"required": ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"],
"validators": {},
"allow_none_or_empty": False # Special flag for Google connectors
},
"GOOGLE_GMAIL_CONNECTOR": {
"required": ["token", "refresh_token", "token_uri", "client_id", "expiry", "scopes", "client_secret"],
"validators": {},
"allow_none_or_empty": False
},
"AIRTABLE_CONNECTOR": {
"required": ["AIRTABLE_API_KEY", "AIRTABLE_BASE_ID"],
"validators": {}
},
"LUMA_CONNECTOR": {
"required": ["LUMA_API_KEY"],
"validators": {}
}
}
rules = connector_rules.get(connector_type_str)
if not rules:
return config # Unknown connector type, pass through
# Validate required keys match exactly
if set(config.keys()) != set(rules["required"]):
raise ValueError(
f"For {connector_type_str} connector type, config must only contain these keys: {rules['required']}"
)
# Apply custom validators first (these check format before emptiness)
for validator_func in rules["validators"].values():
validator_func()
# Validate each field is not empty
for key in rules["required"]:
# Special handling for Google connectors that don't allow None or empty strings
if rules.get("allow_none_or_empty") is False:
if key not in config or config[key] in (None, ""):
raise ValueError(f"{key} is required and cannot be empty")
else:
# Standard check: field must have a truthy value
if not config.get(key):
raise ValueError(f"{key} cannot be empty")
return config