diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index a79168fdf..def86d721 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -1,4 +1,7 @@ +import asyncio import logging +from collections.abc import Awaitable, Callable +from typing import Any, TypeVar from notion_client import AsyncClient from notion_client.errors import APIResponseError @@ -13,6 +16,32 @@ from app.utils.oauth_security import TokenEncryption logger = logging.getLogger(__name__) +# Type variable for generic return type +T = TypeVar("T") + +# ============================================================================ +# Retry Configuration (per Notion API docs) +# https://developers.notion.com/reference/request-limits +# https://developers.notion.com/reference/status-codes +# ============================================================================ +MAX_RETRIES = 5 +BASE_RETRY_DELAY = 1.0 # seconds +MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout) + +# Type alias for retry callback function +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +# retry_reason: 'rate_limit', 'server_error', 'timeout' +# This callback can be used to update notifications during retries +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + +# HTTP status codes that should trigger a retry +# 429: rate_limited - Use Retry-After header +# 500: internal_server_error - Unexpected error +# 502: bad_gateway - Failed upstream connection +# 503: service_unavailable - Notion unavailable or timeout +# 504: gateway_timeout - Notion timed out +RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + # Known unsupported block types that Notion API doesn't expose # These will be skipped gracefully instead of failing the entire sync UNSUPPORTED_BLOCK_TYPE_ERRORS = [ @@ -46,6 +75,24 @@ class NotionHistoryConnector: self._notion_client: AsyncClient | None = None # Track pages with skipped unsupported content (for user notifications) self._pages_with_skipped_content: list[str] = [] + # Optional callback to notify about retry progress (for user notifications) + self._on_retry_callback: RetryCallbackType | None = None + + def set_retry_callback(self, callback: RetryCallbackType | None) -> None: + """ + Set a callback function to be called when API calls are retried. + + This allows the indexer to receive notifications about rate limits + and other transient errors, which can be used to update user-facing + notifications. + + Args: + callback: Async function with signature: + callback(retry_reason, attempt, max_attempts, wait_seconds) -> None + retry_reason: 'rate_limit', 'server_error', or 'timeout' + Set to None to disable callbacks. + """ + self._on_retry_callback = callback async def _get_valid_token(self) -> str: """ @@ -171,6 +218,120 @@ class NotionHistoryConnector: self._notion_client = AsyncClient(auth=token) return self._notion_client + async def _api_call_with_retry( + self, + api_func: Callable[..., Awaitable[T]], + *args: Any, + on_retry: RetryCallbackType | None = None, + **kwargs: Any, + ) -> T: + """ + Execute Notion API call with retry logic and exponential backoff. + + Handles retryable errors per Notion API documentation: + - 429 rate_limited: Uses Retry-After header value + - 500 internal_server_error: Retries with exponential backoff + - 502 bad_gateway: Retries with exponential backoff + - 503 service_unavailable: Retries with exponential backoff + - 504 gateway_timeout: Retries with exponential backoff + + Args: + api_func: The async Notion API function to call + *args: Positional arguments to pass to the API function + on_retry: Optional callback to notify about retry progress. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' + **kwargs: Keyword arguments to pass to the API function + + Returns: + The result from the API call + + Raises: + APIResponseError: If all retries are exhausted or error is not retryable + """ + last_exception: APIResponseError | None = None + retry_delay = BASE_RETRY_DELAY + + for attempt in range(MAX_RETRIES): + try: + return await api_func(*args, **kwargs) + + except APIResponseError as e: + last_exception = e + + # Check if this error is retryable + if e.status not in RETRYABLE_STATUS_CODES: + # Not retryable (e.g., 400, 401, 403, 404) - raise immediately + raise + + # Check if we've exhausted retries + if attempt == MAX_RETRIES - 1: + logger.error( + f"Notion API call failed after {MAX_RETRIES} retries. " + f"Last error: {e.status} {e.code}" + ) + raise + + # Determine retry reason and wait time based on status code + if e.status == 429: + # Rate limited - use Retry-After header if available + retry_reason = "rate_limit" + retry_after = e.headers.get("Retry-After") if e.headers else None + if retry_after: + try: + wait_time = float(retry_after) + except (ValueError, TypeError): + wait_time = retry_delay + else: + wait_time = retry_delay + logger.warning( + f"Notion API rate limited (429). " + f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + elif e.status == 504: + # Gateway timeout + retry_reason = "timeout" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API timeout ({e.status}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + else: + # Server error (500/502/503) - use exponential backoff + retry_reason = "server_error" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API error {e.status} ({e.code}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + + # Notify about retry via callback (for user notifications) + # Call before sleeping so user sees the message while we wait + if on_retry: + try: + await on_retry( + retry_reason, + attempt + 1, # 1-based for display + MAX_RETRIES, + wait_time, + ) + except Exception as callback_error: + # Don't let callback errors break the retry logic + logger.warning( + f"Retry callback failed: {callback_error}" + ) + + # Wait before retrying + await asyncio.sleep(wait_time) + + # Exponential backoff for next attempt + retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY) + + # This should not be reached, but just in case + if last_exception: + raise last_exception + raise RuntimeError("Unexpected state in retry logic") + async def close(self): """Close the async client connection.""" if self._notion_client: @@ -228,7 +389,7 @@ class NotionHistoryConnector: # Build the filter for the search # Note: Notion API requires specific filter structure - search_params = {} + search_params: dict[str, Any] = {} # Filter for pages only (not databases) search_params["filter"] = {"value": "page", "property": "object"} @@ -259,7 +420,10 @@ class NotionHistoryConnector: if cursor: search_params["start_cursor"] = cursor - search_results = await notion.search(**search_params) + # Use retry wrapper for search API call + search_results = await self._api_call_with_retry( + notion.search, on_retry=self._on_retry_callback, **search_params + ) pages.extend(search_results["results"]) has_more = search_results.get("has_more", False) @@ -338,12 +502,20 @@ class NotionHistoryConnector: # Paginate through all blocks while has_more: try: + # Use retry wrapper for blocks.children.list API call if cursor: - response = await notion.blocks.children.list( - block_id=page_id, start_cursor=cursor + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + start_cursor=cursor, ) else: - response = await notion.blocks.children.list(block_id=page_id) + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + ) blocks.extend(response["results"]) has_more = response["has_more"] @@ -372,7 +544,7 @@ class NotionHistoryConnector: ) has_more = False continue - # Re-raise other API errors + # Re-raise other API errors (after retry exhaustion) raise if skipped_blocks_count > 0: @@ -432,9 +604,11 @@ class NotionHistoryConnector: if has_children: try: - # Fetch and process child blocks - children_response = await notion.blocks.children.list( - block_id=block_id + # Use retry wrapper for blocks.children.list API call + children_response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=block_id, ) for child_block in children_response["results"]: processed_child, child_had_skips = await self.process_block( @@ -461,7 +635,7 @@ class NotionHistoryConnector: ) # Continue without children else: - # Re-raise other API errors + # Re-raise other API errors (after retry exhaustion) raise return ( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 191c6f954..535f579a5 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1129,6 +1129,7 @@ async def _run_indexing_with_notifications( end_date: str, indexing_function, update_timestamp_func=None, + supports_retry_callback: bool = False, ): """ Generic helper to run indexing with real-time notifications. @@ -1142,10 +1143,14 @@ async def _run_indexing_with_notifications( end_date: End date for indexing indexing_function: Async function that performs the indexing update_timestamp_func: Optional function to update connector timestamp + supports_retry_callback: Whether the indexing function supports on_retry_callback """ from uuid import UUID notification = None + # Track indexed count for retry notifications + current_indexed_count = 0 + try: # Get connector info for notification connector_result = await session.execute( @@ -1179,16 +1184,47 @@ async def _run_indexing_with_notifications( stage="fetching", ) + # Create retry callback for connectors that support it + async def on_retry_callback( + retry_reason: str, attempt: int, max_attempts: int, wait_seconds: float + ) -> None: + """Callback to update notification during API retries (rate limits, etc.)""" + nonlocal notification + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_retry_progress( + session=session, + notification=notification, + indexed_count=current_indexed_count, + retry_reason=retry_reason, + attempt=attempt, + max_attempts=max_attempts, + wait_seconds=wait_seconds, + ) + await session.commit() + except Exception as e: + # Don't let notification errors break the indexing + logger.warning(f"Failed to update retry notification: {e}") + + # Build kwargs for indexing function + indexing_kwargs = { + "session": session, + "connector_id": connector_id, + "search_space_id": search_space_id, + "user_id": user_id, + "start_date": start_date, + "end_date": end_date, + "update_last_indexed": False, + } + + # Add retry callback for connectors that support it + if supports_retry_callback: + indexing_kwargs["on_retry_callback"] = on_retry_callback + # Run the indexing function - documents_processed, error_or_warning = await indexing_function( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, - ) + documents_processed, error_or_warning = await indexing_function(**indexing_kwargs) + current_indexed_count = documents_processed # Update connector timestamp if function provided and indexing was successful if documents_processed > 0 and update_timestamp_func: @@ -1362,6 +1398,7 @@ async def run_notion_indexing_with_new_session( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) @@ -1393,6 +1430,7 @@ async def run_notion_indexing( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 04f39d8ef..6a3db566b 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -329,6 +329,92 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): metadata_updates=metadata_updates, ) + async def notify_retry_progress( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + retry_reason: str, + attempt: int, + max_attempts: int, + wait_seconds: float | None = None, + service_name: str | None = None, + ) -> Notification: + """ + Update notification when a connector is retrying due to rate limits or errors. + + This method provides user-friendly feedback when external service limitations + (rate limits, temporary outages) cause delays. Users see that the delay is + not our fault and the sync is still progressing. + + This method can be used by ANY connector (Notion, Slack, Airtable, etc.) + when they hit rate limits or transient errors. + + Args: + session: Database session + notification: Notification to update + indexed_count: Number of items indexed so far + retry_reason: Reason for retry ('rate_limit', 'server_error', 'timeout') + attempt: Current retry attempt number (1-based) + max_attempts: Maximum number of retry attempts + wait_seconds: Seconds to wait before retry (optional, for display) + service_name: Name of the external service (e.g., 'Notion', 'Slack') + If not provided, extracts from notification metadata + + Returns: + Updated notification + """ + # Get service name from notification if not provided + if not service_name: + service_name = notification.notification_metadata.get( + "connector_name", "Service" + ) + # Extract just the service name if it's "Notion - My Workspace" + if " - " in service_name: + service_name = service_name.split(" - ")[0] + + # User-friendly messages for different retry reasons + # These make it clear the delay is due to the external service, not SurfSense + retry_messages = { + "rate_limit": f"{service_name} rate limit reached", + "server_error": f"{service_name} is slow to respond", + "timeout": f"{service_name} took too long", + "temporary_error": f"{service_name} temporarily unavailable", + } + + base_message = retry_messages.get( + retry_reason, f"Waiting for {service_name}" + ) + + # Add wait time and progress info + if wait_seconds and wait_seconds > 5: + # Only show wait time if it's significant + message = f"{base_message}. Retrying in {int(wait_seconds)}s..." + else: + message = f"{base_message}. Retrying..." + + # Add progress count if we have any + if indexed_count > 0: + item_text = "item" if indexed_count == 1 else "items" + message = f"{message} ({indexed_count} {item_text} synced so far)" + + metadata_updates = { + "indexed_count": indexed_count, + "sync_stage": "waiting_retry", + "retry_attempt": attempt, + "retry_max_attempts": max_attempts, + "retry_reason": retry_reason, + "retry_wait_seconds": wait_seconds, + } + + return await self.update_notification( + session=session, + notification=notification, + message=message, + status="in_progress", + metadata_updates=metadata_updates, + ) + async def notify_indexing_completed( self, session: AsyncSession, diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index eee668198..b2ab37685 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -2,6 +2,7 @@ Notion connector indexer. """ +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -28,6 +29,10 @@ from .base import ( update_connector_last_indexed, ) +# Type alias for retry callback +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + async def index_notion_pages( session: AsyncSession, @@ -37,6 +42,7 @@ async def index_notion_pages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_retry_callback: RetryCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Notion pages from all accessible pages. @@ -49,6 +55,9 @@ async def index_notion_pages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_retry_callback: Optional callback for retry progress notifications. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' Returns: Tuple containing (number of documents indexed, error message or None) @@ -138,6 +147,10 @@ async def index_notion_pages( session=session, connector_id=connector_id ) + # Set retry callback if provided (for user notifications during rate limits) + if on_retry_callback: + notion_client.set_retry_callback(on_retry_callback) + logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") await task_logger.log_task_progress(