diff --git a/surfsense_backend/app/connectors/notion_history.py b/surfsense_backend/app/connectors/notion_history.py index e38218a6e..b347eb9c6 100644 --- a/surfsense_backend/app/connectors/notion_history.py +++ b/surfsense_backend/app/connectors/notion_history.py @@ -1,6 +1,10 @@ +import asyncio import logging +from collections.abc import Awaitable, Callable +from typing import Any, TypeVar from notion_client import AsyncClient +from notion_client.errors import APIResponseError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select @@ -12,6 +16,43 @@ from app.utils.oauth_security import TokenEncryption logger = logging.getLogger(__name__) +# Type variable for generic return type +T = TypeVar("T") + +# ============================================================================ +# Retry Configuration (per Notion API docs) +# https://developers.notion.com/reference/request-limits +# https://developers.notion.com/reference/status-codes +# ============================================================================ +MAX_RETRIES = 5 +BASE_RETRY_DELAY = 1.0 # seconds +MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout) + +# Type alias for retry callback function +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +# retry_reason: 'rate_limit', 'server_error', 'timeout' +# This callback can be used to update notifications during retries +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + +# HTTP status codes that should trigger a retry +# 429: rate_limited - Use Retry-After header +# 500: internal_server_error - Unexpected error +# 502: bad_gateway - Failed upstream connection +# 503: service_unavailable - Notion unavailable or timeout +# 504: gateway_timeout - Notion timed out +RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504}) + +# Known unsupported block types that Notion API doesn't expose +# These will be skipped gracefully instead of failing the entire sync +UNSUPPORTED_BLOCK_TYPE_ERRORS = [ + "transcription is not supported", + "ai_block is not supported", + "is not supported via the API", +] + +# Known unsupported block types to check before API calls +UNSUPPORTED_BLOCK_TYPES = ["transcription", "ai_block"] + class NotionHistoryConnector: def __init__( @@ -32,6 +73,28 @@ class NotionHistoryConnector: self._connector_id = connector_id self._credentials = credentials self._notion_client: AsyncClient | None = None + # Track pages with skipped unsupported content (for user notifications) + self._pages_with_skipped_content: list[str] = [] + # Optional callback to notify about retry progress (for user notifications) + self._on_retry_callback: RetryCallbackType | None = None + # Track if using legacy integration token (for upgrade notification) + self._using_legacy_token: bool = False + + def set_retry_callback(self, callback: RetryCallbackType | None) -> None: + """ + Set a callback function to be called when API calls are retried. + + This allows the indexer to receive notifications about rate limits + and other transient errors, which can be used to update user-facing + notifications. + + Args: + callback: Async function with signature: + callback(retry_reason, attempt, max_attempts, wait_seconds) -> None + retry_reason: 'rate_limit', 'server_error', or 'timeout' + Set to None to disable callbacks. + """ + self._on_retry_callback = callback async def _get_valid_token(self) -> str: """ @@ -58,6 +121,18 @@ class NotionHistoryConnector: config_data = connector.config.copy() + # Check for legacy integration token format first + # (for connectors created before OAuth was implemented) + legacy_token = config_data.get("NOTION_INTEGRATION_TOKEN") + raw_access_token = config_data.get("access_token") + + # Validate that we have some form of token + if not raw_access_token and not legacy_token: + raise ValueError( + "Notion integration not properly connected. " + "Please remove and re-add the Notion connector." + ) + # Decrypt credentials if they are encrypted token_encrypted = config_data.get("_token_encrypted", False) if token_encrypted and config.SECRET_KEY: @@ -82,13 +157,38 @@ class NotionHistoryConnector: f"Failed to decrypt Notion credentials for connector {self._connector_id}: {e!s}" ) raise ValueError( - f"Failed to decrypt Notion credentials: {e!s}" + "Notion credentials could not be decrypted. " + "Please remove and re-add the Notion connector." ) from e + # Handle legacy format: convert NOTION_INTEGRATION_TOKEN to access_token + if not config_data.get("access_token") and legacy_token: + config_data["access_token"] = legacy_token + self._using_legacy_token = True + logger.info( + f"Using legacy NOTION_INTEGRATION_TOKEN for connector {self._connector_id}" + ) + + # Final validation: ensure we have a valid access_token after all processing + final_token = config_data.get("access_token") + if not final_token or (isinstance(final_token, str) and not final_token.strip()): + raise ValueError( + "Notion access token is invalid or empty. " + "Please remove and re-add the Notion connector." + ) + try: self._credentials = NotionAuthCredentialsBase.from_dict(config_data) + except KeyError as e: + raise ValueError( + f"Notion credentials are incomplete (missing {e}). " + "Please reconnect your Notion account." + ) from e except Exception as e: - raise ValueError(f"Invalid Notion credentials: {e!s}") from e + raise ValueError( + f"Notion credentials format error: {e!s}. " + "Please reconnect your Notion account." + ) from e # Check if token is expired and refreshable if self._credentials.is_expired and self._credentials.is_refreshable: @@ -157,12 +257,163 @@ class NotionHistoryConnector: self._notion_client = AsyncClient(auth=token) return self._notion_client + async def _api_call_with_retry( + self, + api_func: Callable[..., Awaitable[T]], + *args: Any, + on_retry: RetryCallbackType | None = None, + **kwargs: Any, + ) -> T: + """ + Execute Notion API call with retry logic and exponential backoff. + + Handles retryable errors per Notion API documentation: + - 429 rate_limited: Uses Retry-After header value + - 500 internal_server_error: Retries with exponential backoff + - 502 bad_gateway: Retries with exponential backoff + - 503 service_unavailable: Retries with exponential backoff + - 504 gateway_timeout: Retries with exponential backoff + + Args: + api_func: The async Notion API function to call + *args: Positional arguments to pass to the API function + on_retry: Optional callback to notify about retry progress. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' + **kwargs: Keyword arguments to pass to the API function + + Returns: + The result from the API call + + Raises: + APIResponseError: If all retries are exhausted or error is not retryable + """ + last_exception: APIResponseError | None = None + retry_delay = BASE_RETRY_DELAY + + for attempt in range(MAX_RETRIES): + try: + return await api_func(*args, **kwargs) + + except APIResponseError as e: + last_exception = e + + # Check if this error is retryable + if e.status not in RETRYABLE_STATUS_CODES: + # Not retryable (e.g., 400, 401, 403, 404) - raise immediately + raise + + # Check if we've exhausted retries + if attempt == MAX_RETRIES - 1: + logger.error( + f"Notion API call failed after {MAX_RETRIES} retries. " + f"Last error: {e.status} {e.code}" + ) + raise + + # Determine retry reason and wait time based on status code + if e.status == 429: + # Rate limited - use Retry-After header if available + retry_reason = "rate_limit" + retry_after = e.headers.get("Retry-After") if e.headers else None + if retry_after: + try: + wait_time = float(retry_after) + except (ValueError, TypeError): + wait_time = retry_delay + else: + wait_time = retry_delay + logger.warning( + f"Notion API rate limited (429). " + f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + elif e.status == 504: + # Gateway timeout + retry_reason = "timeout" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API timeout ({e.status}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + else: + # Server error (500/502/503) - use exponential backoff + retry_reason = "server_error" + wait_time = min(retry_delay, MAX_RETRY_DELAY) + logger.warning( + f"Notion API error {e.status} ({e.code}). " + f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}" + ) + + # Notify about retry via callback (for user notifications) + # Call before sleeping so user sees the message while we wait + if on_retry: + try: + await on_retry( + retry_reason, + attempt + 1, # 1-based for display + MAX_RETRIES, + wait_time, + ) + except Exception as callback_error: + # Don't let callback errors break the retry logic + logger.warning( + f"Retry callback failed: {callback_error}" + ) + + # Wait before retrying + await asyncio.sleep(wait_time) + + # Exponential backoff for next attempt + retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY) + + # This should not be reached, but just in case + if last_exception: + raise last_exception + raise RuntimeError("Unexpected state in retry logic") + async def close(self): """Close the async client connection.""" if self._notion_client: await self._notion_client.aclose() self._notion_client = None + def get_pages_with_skipped_content(self) -> list[str]: + """ + Get list of page titles that had unsupported content skipped. + + Returns: + List of page titles with skipped content + """ + return self._pages_with_skipped_content + + def get_skipped_content_count(self) -> int: + """ + Get count of pages that had unsupported content skipped. + + Returns: + Number of pages with skipped content + """ + return len(self._pages_with_skipped_content) + + def is_using_legacy_token(self) -> bool: + """ + Check if connector is using legacy integration token format. + + Returns: + True if using legacy NOTION_INTEGRATION_TOKEN, False if using OAuth + """ + return self._using_legacy_token + + def _record_skipped_content(self, page_title: str): + """ + Record that a page had unsupported content skipped. + + Args: + page_title: Title of the page with skipped content + """ + if page_title not in self._pages_with_skipped_content: + self._pages_with_skipped_content.append(page_title) + async def __aenter__(self): """Async context manager entry.""" return self @@ -186,7 +437,7 @@ class NotionHistoryConnector: # Build the filter for the search # Note: Notion API requires specific filter structure - search_params = {} + search_params: dict[str, Any] = {} # Filter for pages only (not databases) search_params["filter"] = {"value": "page", "property": "object"} @@ -214,29 +465,53 @@ class NotionHistoryConnector: cursor = None while has_more: - if cursor: - search_params["start_cursor"] = cursor + try: + if cursor: + search_params["start_cursor"] = cursor - search_results = await notion.search(**search_params) + # Use retry wrapper for search API call + search_results = await self._api_call_with_retry( + notion.search, on_retry=self._on_retry_callback, **search_params + ) - pages.extend(search_results["results"]) - has_more = search_results.get("has_more", False) + pages.extend(search_results["results"]) + has_more = search_results.get("has_more", False) - if has_more: - cursor = search_results.get("next_cursor") + if has_more: + cursor = search_results.get("next_cursor") + + except APIResponseError as e: + error_message = str(e) + # Handle invalid cursor - stop pagination gracefully + if "start_cursor provided is invalid" in error_message: + logger.warning( + f"Invalid pagination cursor encountered. " + f"Continuing with {len(pages)} pages already fetched." + ) + has_more = False + continue + # Re-raise other errors + raise all_page_data = [] for page in pages: page_id = page["id"] + page_title = self.get_page_title(page) - # Get detailed page information - page_content = await self.get_page_content(page_id) + # Get detailed page information (pass title for skip tracking) + page_content, had_skipped_content = await self.get_page_content( + page_id, page_title + ) + + # Record if this page had skipped content + if had_skipped_content: + self._record_skipped_content(page_title) all_page_data.append( { "page_id": page_id, - "title": self.get_page_title(page), + "title": page_title, "content": page_content, } ) @@ -265,46 +540,93 @@ class NotionHistoryConnector: # If no title found, return the page ID as fallback return f"Untitled page ({page['id']})" - async def get_page_content(self, page_id): + async def get_page_content( + self, page_id: str, page_title: str | None = None + ) -> tuple[list, bool]: """ Fetches the content (blocks) of a specific page. Args: page_id (str): The ID of the page to fetch + page_title (str, optional): Title of the page (for logging) Returns: - list: List of processed blocks from the page + tuple: (List of processed blocks, bool indicating if content was skipped) """ notion = await self._get_client() blocks = [] has_more = True cursor = None + skipped_blocks_count = 0 + had_skipped_content = False # Paginate through all blocks while has_more: - if cursor: - response = await notion.blocks.children.list( - block_id=page_id, start_cursor=cursor - ) - else: - response = await notion.blocks.children.list(block_id=page_id) + try: + # Use retry wrapper for blocks.children.list API call + if cursor: + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + start_cursor=cursor, + ) + else: + response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=page_id, + ) - blocks.extend(response["results"]) - has_more = response["has_more"] + blocks.extend(response["results"]) + has_more = response["has_more"] - if has_more: - cursor = response["next_cursor"] + if has_more: + cursor = response["next_cursor"] + + except APIResponseError as e: + error_message = str(e) + # Check if this is an unsupported block type error + if any( + err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS + ): + logger.warning( + f"Skipping page blocks due to unsupported block type in page {page_id}: {error_message}" + ) + skipped_blocks_count += 1 + had_skipped_content = True + # If we haven't fetched any blocks yet, return empty + # If we have some blocks, continue with what we have + has_more = False + continue + elif "Could not find block" in error_message: + logger.warning( + f"Block not found in page {page_id}, continuing with available blocks: {error_message}" + ) + has_more = False + continue + # Re-raise other API errors (after retry exhaustion) + raise + + if skipped_blocks_count > 0: + logger.info( + f"Page {page_id}: Skipped {skipped_blocks_count} unsupported block sections, " + f"successfully processed {len(blocks)} blocks" + ) # Process nested blocks recursively processed_blocks = [] for block in blocks: - processed_block = await self.process_block(block) - processed_blocks.append(processed_block) + processed_block, block_had_skips = await self.process_block(block) + if processed_block: # Only add if block was processed successfully + processed_blocks.append(processed_block) + if block_had_skips: + had_skipped_content = True - return processed_blocks + return processed_blocks, had_skipped_content - async def process_block(self, block): + async def process_block(self, block) -> tuple[dict | None, bool]: """ Processes a block and recursively fetches any child blocks. @@ -312,12 +634,28 @@ class NotionHistoryConnector: block (dict): The block to process Returns: - dict: Processed block with content and children + tuple: (Processed block dict or None, bool indicating if content was skipped) """ notion = await self._get_client() block_id = block["id"] block_type = block["type"] + had_skipped_content = False + + # Check if this is a known unsupported block type before processing + if block_type in UNSUPPORTED_BLOCK_TYPES: + logger.debug( + f"Skipping unsupported block type: {block_type} (block_id: {block_id})" + ) + return ( + { + "id": block_id, + "type": block_type, + "content": f"[{block_type} block - not supported by Notion API]", + "children": [], + }, + True, # Content was skipped + ) # Extract block content based on its type content = self.extract_block_content(block) @@ -327,17 +665,50 @@ class NotionHistoryConnector: child_blocks = [] if has_children: - # Fetch and process child blocks - children_response = await notion.blocks.children.list(block_id=block_id) - for child_block in children_response["results"]: - child_blocks.append(await self.process_block(child_block)) + try: + # Use retry wrapper for blocks.children.list API call + children_response = await self._api_call_with_retry( + notion.blocks.children.list, + on_retry=self._on_retry_callback, + block_id=block_id, + ) + for child_block in children_response["results"]: + processed_child, child_had_skips = await self.process_block( + child_block + ) + if processed_child: + child_blocks.append(processed_child) + if child_had_skips: + had_skipped_content = True + except APIResponseError as e: + error_message = str(e) + # Check if this is an unsupported block type error + if any( + err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS + ): + logger.warning( + f"Skipping children of block {block_id} due to unsupported block type: {error_message}" + ) + had_skipped_content = True + # Continue without children instead of failing + elif "Could not find block" in error_message: + logger.warning( + f"Block {block_id} children not accessible, skipping: {error_message}" + ) + # Continue without children + else: + # Re-raise other API errors (after retry exhaustion) + raise - return { - "id": block_id, - "type": block_type, - "content": content, - "children": child_blocks, - } + return ( + { + "id": block_id, + "type": block_type, + "content": content, + "children": child_blocks, + }, + had_skipped_content, + ) def extract_block_content(self, block): """ diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index c4d4fdb3b..a27c2125c 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1149,6 +1149,7 @@ async def _run_indexing_with_notifications( end_date: str, indexing_function, update_timestamp_func=None, + supports_retry_callback: bool = False, ): """ Generic helper to run indexing with real-time notifications. @@ -1162,10 +1163,14 @@ async def _run_indexing_with_notifications( end_date: End date for indexing indexing_function: Async function that performs the indexing update_timestamp_func: Optional function to update connector timestamp + supports_retry_callback: Whether the indexing function supports on_retry_callback """ from uuid import UUID notification = None + # Track indexed count for retry notifications + current_indexed_count = 0 + try: # Get connector info for notification connector_result = await session.execute( @@ -1199,17 +1204,47 @@ async def _run_indexing_with_notifications( stage="fetching", ) + # Create retry callback for connectors that support it + async def on_retry_callback( + retry_reason: str, attempt: int, max_attempts: int, wait_seconds: float + ) -> None: + """Callback to update notification during API retries (rate limits, etc.)""" + nonlocal notification + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_retry_progress( + session=session, + notification=notification, + indexed_count=current_indexed_count, + retry_reason=retry_reason, + attempt=attempt, + max_attempts=max_attempts, + wait_seconds=wait_seconds, + ) + await session.commit() + except Exception as e: + # Don't let notification errors break the indexing + logger.warning(f"Failed to update retry notification: {e}") + + # Build kwargs for indexing function + indexing_kwargs = { + "session": session, + "connector_id": connector_id, + "search_space_id": search_space_id, + "user_id": user_id, + "start_date": start_date, + "end_date": end_date, + "update_last_indexed": False, + } + + # Add retry callback for connectors that support it + if supports_retry_callback: + indexing_kwargs["on_retry_callback"] = on_retry_callback + # Run the indexing function # Some indexers return (indexed, error), others return (indexed, skipped, error) - result = await indexing_function( - session=session, - connector_id=connector_id, - search_space_id=search_space_id, - user_id=user_id, - start_date=start_date, - end_date=end_date, - update_last_indexed=False, - ) + result = await indexing_function(**indexing_kwargs) # Handle both 2-tuple and 3-tuple returns for backwards compatibility if len(result) == 3: @@ -1290,8 +1325,15 @@ async def _run_indexing_with_notifications( "no " in error_or_warning_lower and "found" in error_or_warning_lower ) + # Informational warnings - sync succeeded but some content couldn't be synced + # These are NOT errors, just notifications about API limitations or recommendations + is_info_warning = ( + "couldn't be synced" in error_or_warning_lower + or "using legacy token" in error_or_warning_lower + or "(api limitation)" in error_or_warning_lower + ) - if is_duplicate_warning or is_empty_result: + if is_duplicate_warning or is_empty_result or is_info_warning: # These are success cases - sync worked, just found nothing new logger.info(f"Indexing completed successfully: {error_or_warning}") # Still update timestamp so ElectricSQL syncs and clears "Syncing" UI @@ -1396,6 +1438,7 @@ async def run_notion_indexing_with_new_session( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) @@ -1427,6 +1470,7 @@ async def run_notion_indexing( end_date=end_date, indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_retry_callback=True, # Notion connector supports retry notifications ) diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 34acbad88..1a91d000f 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -329,6 +329,92 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): metadata_updates=metadata_updates, ) + async def notify_retry_progress( + self, + session: AsyncSession, + notification: Notification, + indexed_count: int, + retry_reason: str, + attempt: int, + max_attempts: int, + wait_seconds: float | None = None, + service_name: str | None = None, + ) -> Notification: + """ + Update notification when a connector is retrying due to rate limits or errors. + + This method provides user-friendly feedback when external service limitations + (rate limits, temporary outages) cause delays. Users see that the delay is + not our fault and the sync is still progressing. + + This method can be used by ANY connector (Notion, Slack, Airtable, etc.) + when they hit rate limits or transient errors. + + Args: + session: Database session + notification: Notification to update + indexed_count: Number of items indexed so far + retry_reason: Reason for retry ('rate_limit', 'server_error', 'timeout') + attempt: Current retry attempt number (1-based) + max_attempts: Maximum number of retry attempts + wait_seconds: Seconds to wait before retry (optional, for display) + service_name: Name of the external service (e.g., 'Notion', 'Slack') + If not provided, extracts from notification metadata + + Returns: + Updated notification + """ + # Get service name from notification if not provided + if not service_name: + service_name = notification.notification_metadata.get( + "connector_name", "Service" + ) + # Extract just the service name if it's "Notion - My Workspace" + if " - " in service_name: + service_name = service_name.split(" - ")[0] + + # User-friendly messages for different retry reasons + # These make it clear the delay is due to the external service, not SurfSense + retry_messages = { + "rate_limit": f"{service_name} rate limit reached", + "server_error": f"{service_name} is slow to respond", + "timeout": f"{service_name} took too long", + "temporary_error": f"{service_name} temporarily unavailable", + } + + base_message = retry_messages.get( + retry_reason, f"Waiting for {service_name}" + ) + + # Add wait time and progress info + if wait_seconds and wait_seconds > 5: + # Only show wait time if it's significant + message = f"{base_message}. Retrying in {int(wait_seconds)}s..." + else: + message = f"{base_message}. Retrying..." + + # Add progress count if we have any + if indexed_count > 0: + item_text = "item" if indexed_count == 1 else "items" + message = f"{message} ({indexed_count} {item_text} synced so far)" + + metadata_updates = { + "indexed_count": indexed_count, + "sync_stage": "waiting_retry", + "retry_attempt": attempt, + "retry_max_attempts": max_attempts, + "retry_reason": retry_reason, + "retry_wait_seconds": wait_seconds, + } + + return await self.update_notification( + session=session, + notification=notification, + message=message, + status="in_progress", + metadata_updates=metadata_updates, + ) + async def notify_indexing_completed( self, session: AsyncSession, diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 70c4917da..b8d2297c5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -2,6 +2,7 @@ Notion connector indexer. """ +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -29,6 +30,10 @@ from .base import ( update_connector_last_indexed, ) +# Type alias for retry callback +# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None +RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] + async def index_notion_pages( session: AsyncSession, @@ -38,6 +43,7 @@ async def index_notion_pages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_retry_callback: RetryCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Notion pages from all accessible pages. @@ -50,6 +56,9 @@ async def index_notion_pages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_retry_callback: Optional callback for retry progress notifications. + Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) + retry_reason is one of: 'rate_limit', 'server_error', 'timeout' Returns: Tuple containing (number of documents indexed, error message or None) @@ -139,6 +148,10 @@ async def index_notion_pages( session=session, connector_id=connector_id ) + # Set retry callback if provided (for user notifications during rate limits) + if on_retry_callback: + notion_client.set_retry_callback(on_retry_callback) + logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}") await task_logger.log_task_progress( @@ -157,6 +170,20 @@ async def index_notion_pages( start_date=start_date_iso, end_date=end_date_iso ) logger.info(f"Found {len(pages)} Notion pages") + + # Get count of pages that had unsupported content skipped + pages_with_skipped_content = notion_client.get_skipped_content_count() + if pages_with_skipped_content > 0: + logger.info( + f"{pages_with_skipped_content} pages had Notion AI content skipped (not available via API)" + ) + + # Check if using legacy integration token and log warning + if notion_client.is_using_legacy_token(): + logger.warning( + f"Connector {connector_id} is using legacy integration token. " + "Recommend reconnecting with OAuth." + ) except Exception as e: await task_logger.log_task_failure( log_entry, @@ -171,12 +198,13 @@ async def index_notion_pages( if not pages: await task_logger.log_task_success( log_entry, - f"No Notion pages found for connector {connector_id}", + f"No Notion pages found for connector {connector_id}. " + "Ensure pages are shared with the Notion integration.", {"pages_found": 0}, ) logger.info("No Notion pages found to index") await notion_client.close() - return 0, "No Notion pages found" + return 0, None # Success with 0 pages, not an error # Track the number of documents indexed documents_indexed = 0 @@ -454,13 +482,23 @@ async def index_notion_pages( logger.info(f"Final commit: Total {documents_indexed} documents processed") await session.commit() - # Prepare result message + # Get final count of pages with skipped Notion AI content + pages_with_skipped_ai_content = notion_client.get_skipped_content_count() + + # Prepare result message with user-friendly notification about skipped content result_message = None if skipped_pages: result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}" else: result_message = f"Processed {total_processed} pages." + # Add user-friendly message about skipped Notion AI content + if pages_with_skipped_ai_content > 0: + result_message += ( + " Audio transcriptions and AI summaries from Notion aren't accessible " + "via their API - all other content was saved." + ) + # Log success await task_logger.log_task_success( log_entry, @@ -470,6 +508,7 @@ async def index_notion_pages( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "skipped_pages_count": len(skipped_pages), + "pages_with_skipped_ai_content": pages_with_skipped_ai_content, "result_message": result_message, }, ) @@ -481,10 +520,26 @@ async def index_notion_pages( # Clean up the async client await notion_client.close() + # Build user-friendly notification messages + # This will be shown in the notification to inform users + notification_parts = [] + + if pages_with_skipped_ai_content > 0: + notification_parts.append( + "Some Notion AI content couldn't be synced (API limitation)" + ) + + if notion_client.is_using_legacy_token(): + notification_parts.append( + "Using legacy token. Reconnect with OAuth for better reliability." + ) + + user_notification_message = " ".join(notification_parts) if notification_parts else None + return ( total_processed, - None, - ) # Return None on success (result_message is for logging only) + user_notification_message, + ) except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index 35815b0b7..72069441a 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -218,7 +218,7 @@ export const IndexingConfigurationView: FC = ({ {isStartingIndexing ? ( <> - Starting... + Starting ) : ( "Start Indexing" diff --git a/surfsense_web/content/docs/connectors/notion.mdx b/surfsense_web/content/docs/connectors/notion.mdx index 0612c4f4f..6fcda8dae 100644 --- a/surfsense_web/content/docs/connectors/notion.mdx +++ b/surfsense_web/content/docs/connectors/notion.mdx @@ -66,6 +66,29 @@ Click **Save** to apply the capabilities. --- +## Limitations & Unsupported Content + +Notion's API has limitations on certain block types that cannot be retrieved. SurfSense will automatically skip these unsupported blocks and continue syncing all other content. + +### Unsupported Block Types + +The following Notion features are **not accessible via the Notion API** and will be skipped during sync: + +- **Transcription blocks** - Audio/video transcriptions from Notion AI +- **AI blocks** - AI-generated content blocks + +### Learn More + +The Notion API only supports specific block types for retrieval. The official list of **supported block types** is documented in Notion's Block reference: + +- **[Block Object Reference](https://developers.notion.com/reference/block)** - Official documentation listing all supported block types. Any block type not listed here (such as `transcription` and `ai_block`) is not accessible via the Notion API. + +For additional information: +- [Working with Page Content](https://developers.notion.com/docs/working-with-page-content) - Guide on how the Notion API handles page content +- [Notion API Reference](https://developers.notion.com/reference) - Complete API documentation + +--- + ## Running SurfSense with Notion Connector Add the Notion environment variables to your Docker run command: