feat(backend): Add retry logic for Notion API calls with user notifications on rate limits and errors

This commit is contained in:
Anish Sarkar 2026-01-28 18:36:42 +05:30
parent 41ebe162b0
commit 33316fa6db
4 changed files with 330 additions and 19 deletions

View file

@ -1,4 +1,7 @@
import asyncio
import logging
from collections.abc import Awaitable, Callable
from typing import Any, TypeVar
from notion_client import AsyncClient
from notion_client.errors import APIResponseError
@ -13,6 +16,32 @@ from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
# Type variable for generic return type
T = TypeVar("T")
# ============================================================================
# Retry Configuration (per Notion API docs)
# https://developers.notion.com/reference/request-limits
# https://developers.notion.com/reference/status-codes
# ============================================================================
MAX_RETRIES = 5
BASE_RETRY_DELAY = 1.0 # seconds
MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout)
# Type alias for retry callback function
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
# retry_reason: 'rate_limit', 'server_error', 'timeout'
# This callback can be used to update notifications during retries
RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]]
# HTTP status codes that should trigger a retry
# 429: rate_limited - Use Retry-After header
# 500: internal_server_error - Unexpected error
# 502: bad_gateway - Failed upstream connection
# 503: service_unavailable - Notion unavailable or timeout
# 504: gateway_timeout - Notion timed out
RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504})
# Known unsupported block types that Notion API doesn't expose
# These will be skipped gracefully instead of failing the entire sync
UNSUPPORTED_BLOCK_TYPE_ERRORS = [
@ -46,6 +75,24 @@ class NotionHistoryConnector:
self._notion_client: AsyncClient | None = None
# Track pages with skipped unsupported content (for user notifications)
self._pages_with_skipped_content: list[str] = []
# Optional callback to notify about retry progress (for user notifications)
self._on_retry_callback: RetryCallbackType | None = None
def set_retry_callback(self, callback: RetryCallbackType | None) -> None:
"""
Set a callback function to be called when API calls are retried.
This allows the indexer to receive notifications about rate limits
and other transient errors, which can be used to update user-facing
notifications.
Args:
callback: Async function with signature:
callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
retry_reason: 'rate_limit', 'server_error', or 'timeout'
Set to None to disable callbacks.
"""
self._on_retry_callback = callback
async def _get_valid_token(self) -> str:
"""
@ -171,6 +218,120 @@ class NotionHistoryConnector:
self._notion_client = AsyncClient(auth=token)
return self._notion_client
async def _api_call_with_retry(
self,
api_func: Callable[..., Awaitable[T]],
*args: Any,
on_retry: RetryCallbackType | None = None,
**kwargs: Any,
) -> T:
"""
Execute Notion API call with retry logic and exponential backoff.
Handles retryable errors per Notion API documentation:
- 429 rate_limited: Uses Retry-After header value
- 500 internal_server_error: Retries with exponential backoff
- 502 bad_gateway: Retries with exponential backoff
- 503 service_unavailable: Retries with exponential backoff
- 504 gateway_timeout: Retries with exponential backoff
Args:
api_func: The async Notion API function to call
*args: Positional arguments to pass to the API function
on_retry: Optional callback to notify about retry progress.
Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds)
retry_reason is one of: 'rate_limit', 'server_error', 'timeout'
**kwargs: Keyword arguments to pass to the API function
Returns:
The result from the API call
Raises:
APIResponseError: If all retries are exhausted or error is not retryable
"""
last_exception: APIResponseError | None = None
retry_delay = BASE_RETRY_DELAY
for attempt in range(MAX_RETRIES):
try:
return await api_func(*args, **kwargs)
except APIResponseError as e:
last_exception = e
# Check if this error is retryable
if e.status not in RETRYABLE_STATUS_CODES:
# Not retryable (e.g., 400, 401, 403, 404) - raise immediately
raise
# Check if we've exhausted retries
if attempt == MAX_RETRIES - 1:
logger.error(
f"Notion API call failed after {MAX_RETRIES} retries. "
f"Last error: {e.status} {e.code}"
)
raise
# Determine retry reason and wait time based on status code
if e.status == 429:
# Rate limited - use Retry-After header if available
retry_reason = "rate_limit"
retry_after = e.headers.get("Retry-After") if e.headers else None
if retry_after:
try:
wait_time = float(retry_after)
except (ValueError, TypeError):
wait_time = retry_delay
else:
wait_time = retry_delay
logger.warning(
f"Notion API rate limited (429). "
f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
)
elif e.status == 504:
# Gateway timeout
retry_reason = "timeout"
wait_time = min(retry_delay, MAX_RETRY_DELAY)
logger.warning(
f"Notion API timeout ({e.status}). "
f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
)
else:
# Server error (500/502/503) - use exponential backoff
retry_reason = "server_error"
wait_time = min(retry_delay, MAX_RETRY_DELAY)
logger.warning(
f"Notion API error {e.status} ({e.code}). "
f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
)
# Notify about retry via callback (for user notifications)
# Call before sleeping so user sees the message while we wait
if on_retry:
try:
await on_retry(
retry_reason,
attempt + 1, # 1-based for display
MAX_RETRIES,
wait_time,
)
except Exception as callback_error:
# Don't let callback errors break the retry logic
logger.warning(
f"Retry callback failed: {callback_error}"
)
# Wait before retrying
await asyncio.sleep(wait_time)
# Exponential backoff for next attempt
retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
# This should not be reached, but just in case
if last_exception:
raise last_exception
raise RuntimeError("Unexpected state in retry logic")
async def close(self):
"""Close the async client connection."""
if self._notion_client:
@ -228,7 +389,7 @@ class NotionHistoryConnector:
# Build the filter for the search
# Note: Notion API requires specific filter structure
search_params = {}
search_params: dict[str, Any] = {}
# Filter for pages only (not databases)
search_params["filter"] = {"value": "page", "property": "object"}
@ -259,7 +420,10 @@ class NotionHistoryConnector:
if cursor:
search_params["start_cursor"] = cursor
search_results = await notion.search(**search_params)
# Use retry wrapper for search API call
search_results = await self._api_call_with_retry(
notion.search, on_retry=self._on_retry_callback, **search_params
)
pages.extend(search_results["results"])
has_more = search_results.get("has_more", False)
@ -338,12 +502,20 @@ class NotionHistoryConnector:
# Paginate through all blocks
while has_more:
try:
# Use retry wrapper for blocks.children.list API call
if cursor:
response = await notion.blocks.children.list(
block_id=page_id, start_cursor=cursor
response = await self._api_call_with_retry(
notion.blocks.children.list,
on_retry=self._on_retry_callback,
block_id=page_id,
start_cursor=cursor,
)
else:
response = await notion.blocks.children.list(block_id=page_id)
response = await self._api_call_with_retry(
notion.blocks.children.list,
on_retry=self._on_retry_callback,
block_id=page_id,
)
blocks.extend(response["results"])
has_more = response["has_more"]
@ -372,7 +544,7 @@ class NotionHistoryConnector:
)
has_more = False
continue
# Re-raise other API errors
# Re-raise other API errors (after retry exhaustion)
raise
if skipped_blocks_count > 0:
@ -432,9 +604,11 @@ class NotionHistoryConnector:
if has_children:
try:
# Fetch and process child blocks
children_response = await notion.blocks.children.list(
block_id=block_id
# Use retry wrapper for blocks.children.list API call
children_response = await self._api_call_with_retry(
notion.blocks.children.list,
on_retry=self._on_retry_callback,
block_id=block_id,
)
for child_block in children_response["results"]:
processed_child, child_had_skips = await self.process_block(
@ -461,7 +635,7 @@ class NotionHistoryConnector:
)
# Continue without children
else:
# Re-raise other API errors
# Re-raise other API errors (after retry exhaustion)
raise
return (

View file

@ -1129,6 +1129,7 @@ async def _run_indexing_with_notifications(
end_date: str,
indexing_function,
update_timestamp_func=None,
supports_retry_callback: bool = False,
):
"""
Generic helper to run indexing with real-time notifications.
@ -1142,10 +1143,14 @@ async def _run_indexing_with_notifications(
end_date: End date for indexing
indexing_function: Async function that performs the indexing
update_timestamp_func: Optional function to update connector timestamp
supports_retry_callback: Whether the indexing function supports on_retry_callback
"""
from uuid import UUID
notification = None
# Track indexed count for retry notifications
current_indexed_count = 0
try:
# Get connector info for notification
connector_result = await session.execute(
@ -1179,16 +1184,47 @@ async def _run_indexing_with_notifications(
stage="fetching",
)
# Create retry callback for connectors that support it
async def on_retry_callback(
retry_reason: str, attempt: int, max_attempts: int, wait_seconds: float
) -> None:
"""Callback to update notification during API retries (rate limits, etc.)"""
nonlocal notification
if notification:
try:
await session.refresh(notification)
await NotificationService.connector_indexing.notify_retry_progress(
session=session,
notification=notification,
indexed_count=current_indexed_count,
retry_reason=retry_reason,
attempt=attempt,
max_attempts=max_attempts,
wait_seconds=wait_seconds,
)
await session.commit()
except Exception as e:
# Don't let notification errors break the indexing
logger.warning(f"Failed to update retry notification: {e}")
# Build kwargs for indexing function
indexing_kwargs = {
"session": session,
"connector_id": connector_id,
"search_space_id": search_space_id,
"user_id": user_id,
"start_date": start_date,
"end_date": end_date,
"update_last_indexed": False,
}
# Add retry callback for connectors that support it
if supports_retry_callback:
indexing_kwargs["on_retry_callback"] = on_retry_callback
# Run the indexing function
documents_processed, error_or_warning = await indexing_function(
session=session,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
start_date=start_date,
end_date=end_date,
update_last_indexed=False,
)
documents_processed, error_or_warning = await indexing_function(**indexing_kwargs)
current_indexed_count = documents_processed
# Update connector timestamp if function provided and indexing was successful
if documents_processed > 0 and update_timestamp_func:
@ -1362,6 +1398,7 @@ async def run_notion_indexing_with_new_session(
end_date=end_date,
indexing_function=index_notion_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
supports_retry_callback=True, # Notion connector supports retry notifications
)
@ -1393,6 +1430,7 @@ async def run_notion_indexing(
end_date=end_date,
indexing_function=index_notion_pages,
update_timestamp_func=_update_connector_timestamp_by_id,
supports_retry_callback=True, # Notion connector supports retry notifications
)

View file

@ -329,6 +329,92 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
metadata_updates=metadata_updates,
)
async def notify_retry_progress(
self,
session: AsyncSession,
notification: Notification,
indexed_count: int,
retry_reason: str,
attempt: int,
max_attempts: int,
wait_seconds: float | None = None,
service_name: str | None = None,
) -> Notification:
"""
Update notification when a connector is retrying due to rate limits or errors.
This method provides user-friendly feedback when external service limitations
(rate limits, temporary outages) cause delays. Users see that the delay is
not our fault and the sync is still progressing.
This method can be used by ANY connector (Notion, Slack, Airtable, etc.)
when they hit rate limits or transient errors.
Args:
session: Database session
notification: Notification to update
indexed_count: Number of items indexed so far
retry_reason: Reason for retry ('rate_limit', 'server_error', 'timeout')
attempt: Current retry attempt number (1-based)
max_attempts: Maximum number of retry attempts
wait_seconds: Seconds to wait before retry (optional, for display)
service_name: Name of the external service (e.g., 'Notion', 'Slack')
If not provided, extracts from notification metadata
Returns:
Updated notification
"""
# Get service name from notification if not provided
if not service_name:
service_name = notification.notification_metadata.get(
"connector_name", "Service"
)
# Extract just the service name if it's "Notion - My Workspace"
if " - " in service_name:
service_name = service_name.split(" - ")[0]
# User-friendly messages for different retry reasons
# These make it clear the delay is due to the external service, not SurfSense
retry_messages = {
"rate_limit": f"{service_name} rate limit reached",
"server_error": f"{service_name} is slow to respond",
"timeout": f"{service_name} took too long",
"temporary_error": f"{service_name} temporarily unavailable",
}
base_message = retry_messages.get(
retry_reason, f"Waiting for {service_name}"
)
# Add wait time and progress info
if wait_seconds and wait_seconds > 5:
# Only show wait time if it's significant
message = f"{base_message}. Retrying in {int(wait_seconds)}s..."
else:
message = f"{base_message}. Retrying..."
# Add progress count if we have any
if indexed_count > 0:
item_text = "item" if indexed_count == 1 else "items"
message = f"{message} ({indexed_count} {item_text} synced so far)"
metadata_updates = {
"indexed_count": indexed_count,
"sync_stage": "waiting_retry",
"retry_attempt": attempt,
"retry_max_attempts": max_attempts,
"retry_reason": retry_reason,
"retry_wait_seconds": wait_seconds,
}
return await self.update_notification(
session=session,
notification=notification,
message=message,
status="in_progress",
metadata_updates=metadata_updates,
)
async def notify_indexing_completed(
self,
session: AsyncSession,

View file

@ -2,6 +2,7 @@
Notion connector indexer.
"""
from collections.abc import Awaitable, Callable
from datetime import datetime
from sqlalchemy.exc import SQLAlchemyError
@ -28,6 +29,10 @@ from .base import (
update_connector_last_indexed,
)
# Type alias for retry callback
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]]
async def index_notion_pages(
session: AsyncSession,
@ -37,6 +42,7 @@ async def index_notion_pages(
start_date: str | None = None,
end_date: str | None = None,
update_last_indexed: bool = True,
on_retry_callback: RetryCallbackType | None = None,
) -> tuple[int, str | None]:
"""
Index Notion pages from all accessible pages.
@ -49,6 +55,9 @@ async def index_notion_pages(
start_date: Start date for indexing (YYYY-MM-DD format)
end_date: End date for indexing (YYYY-MM-DD format)
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
on_retry_callback: Optional callback for retry progress notifications.
Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds)
retry_reason is one of: 'rate_limit', 'server_error', 'timeout'
Returns:
Tuple containing (number of documents indexed, error message or None)
@ -138,6 +147,10 @@ async def index_notion_pages(
session=session, connector_id=connector_id
)
# Set retry callback if provided (for user notifications during rate limits)
if on_retry_callback:
notion_client.set_retry_callback(on_retry_callback)
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
await task_logger.log_task_progress(