mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-08 20:25:19 +02:00
Merge branch 'dev' of https://github.com/MODSetter/SurfSense into dev
This commit is contained in:
commit
8e6826dd66
6 changed files with 635 additions and 56 deletions
|
|
@ -1,6 +1,10 @@
|
|||
import asyncio
|
||||
import logging
|
||||
from collections.abc import Awaitable, Callable
|
||||
from typing import Any, TypeVar
|
||||
|
||||
from notion_client import AsyncClient
|
||||
from notion_client.errors import APIResponseError
|
||||
from sqlalchemy.ext.asyncio import AsyncSession
|
||||
from sqlalchemy.future import select
|
||||
|
||||
|
|
@ -12,6 +16,43 @@ from app.utils.oauth_security import TokenEncryption
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Type variable for generic return type
|
||||
T = TypeVar("T")
|
||||
|
||||
# ============================================================================
|
||||
# Retry Configuration (per Notion API docs)
|
||||
# https://developers.notion.com/reference/request-limits
|
||||
# https://developers.notion.com/reference/status-codes
|
||||
# ============================================================================
|
||||
MAX_RETRIES = 5
|
||||
BASE_RETRY_DELAY = 1.0 # seconds
|
||||
MAX_RETRY_DELAY = 60.0 # seconds (Notion's max request timeout)
|
||||
|
||||
# Type alias for retry callback function
|
||||
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
|
||||
# retry_reason: 'rate_limit', 'server_error', 'timeout'
|
||||
# This callback can be used to update notifications during retries
|
||||
RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]]
|
||||
|
||||
# HTTP status codes that should trigger a retry
|
||||
# 429: rate_limited - Use Retry-After header
|
||||
# 500: internal_server_error - Unexpected error
|
||||
# 502: bad_gateway - Failed upstream connection
|
||||
# 503: service_unavailable - Notion unavailable or timeout
|
||||
# 504: gateway_timeout - Notion timed out
|
||||
RETRYABLE_STATUS_CODES = frozenset({429, 500, 502, 503, 504})
|
||||
|
||||
# Known unsupported block types that Notion API doesn't expose
|
||||
# These will be skipped gracefully instead of failing the entire sync
|
||||
UNSUPPORTED_BLOCK_TYPE_ERRORS = [
|
||||
"transcription is not supported",
|
||||
"ai_block is not supported",
|
||||
"is not supported via the API",
|
||||
]
|
||||
|
||||
# Known unsupported block types to check before API calls
|
||||
UNSUPPORTED_BLOCK_TYPES = ["transcription", "ai_block"]
|
||||
|
||||
|
||||
class NotionHistoryConnector:
|
||||
def __init__(
|
||||
|
|
@ -32,6 +73,28 @@ class NotionHistoryConnector:
|
|||
self._connector_id = connector_id
|
||||
self._credentials = credentials
|
||||
self._notion_client: AsyncClient | None = None
|
||||
# Track pages with skipped unsupported content (for user notifications)
|
||||
self._pages_with_skipped_content: list[str] = []
|
||||
# Optional callback to notify about retry progress (for user notifications)
|
||||
self._on_retry_callback: RetryCallbackType | None = None
|
||||
# Track if using legacy integration token (for upgrade notification)
|
||||
self._using_legacy_token: bool = False
|
||||
|
||||
def set_retry_callback(self, callback: RetryCallbackType | None) -> None:
|
||||
"""
|
||||
Set a callback function to be called when API calls are retried.
|
||||
|
||||
This allows the indexer to receive notifications about rate limits
|
||||
and other transient errors, which can be used to update user-facing
|
||||
notifications.
|
||||
|
||||
Args:
|
||||
callback: Async function with signature:
|
||||
callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
|
||||
retry_reason: 'rate_limit', 'server_error', or 'timeout'
|
||||
Set to None to disable callbacks.
|
||||
"""
|
||||
self._on_retry_callback = callback
|
||||
|
||||
async def _get_valid_token(self) -> str:
|
||||
"""
|
||||
|
|
@ -58,6 +121,18 @@ class NotionHistoryConnector:
|
|||
|
||||
config_data = connector.config.copy()
|
||||
|
||||
# Check for legacy integration token format first
|
||||
# (for connectors created before OAuth was implemented)
|
||||
legacy_token = config_data.get("NOTION_INTEGRATION_TOKEN")
|
||||
raw_access_token = config_data.get("access_token")
|
||||
|
||||
# Validate that we have some form of token
|
||||
if not raw_access_token and not legacy_token:
|
||||
raise ValueError(
|
||||
"Notion integration not properly connected. "
|
||||
"Please remove and re-add the Notion connector."
|
||||
)
|
||||
|
||||
# Decrypt credentials if they are encrypted
|
||||
token_encrypted = config_data.get("_token_encrypted", False)
|
||||
if token_encrypted and config.SECRET_KEY:
|
||||
|
|
@ -82,13 +157,38 @@ class NotionHistoryConnector:
|
|||
f"Failed to decrypt Notion credentials for connector {self._connector_id}: {e!s}"
|
||||
)
|
||||
raise ValueError(
|
||||
f"Failed to decrypt Notion credentials: {e!s}"
|
||||
"Notion credentials could not be decrypted. "
|
||||
"Please remove and re-add the Notion connector."
|
||||
) from e
|
||||
|
||||
# Handle legacy format: convert NOTION_INTEGRATION_TOKEN to access_token
|
||||
if not config_data.get("access_token") and legacy_token:
|
||||
config_data["access_token"] = legacy_token
|
||||
self._using_legacy_token = True
|
||||
logger.info(
|
||||
f"Using legacy NOTION_INTEGRATION_TOKEN for connector {self._connector_id}"
|
||||
)
|
||||
|
||||
# Final validation: ensure we have a valid access_token after all processing
|
||||
final_token = config_data.get("access_token")
|
||||
if not final_token or (isinstance(final_token, str) and not final_token.strip()):
|
||||
raise ValueError(
|
||||
"Notion access token is invalid or empty. "
|
||||
"Please remove and re-add the Notion connector."
|
||||
)
|
||||
|
||||
try:
|
||||
self._credentials = NotionAuthCredentialsBase.from_dict(config_data)
|
||||
except KeyError as e:
|
||||
raise ValueError(
|
||||
f"Notion credentials are incomplete (missing {e}). "
|
||||
"Please reconnect your Notion account."
|
||||
) from e
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid Notion credentials: {e!s}") from e
|
||||
raise ValueError(
|
||||
f"Notion credentials format error: {e!s}. "
|
||||
"Please reconnect your Notion account."
|
||||
) from e
|
||||
|
||||
# Check if token is expired and refreshable
|
||||
if self._credentials.is_expired and self._credentials.is_refreshable:
|
||||
|
|
@ -157,12 +257,163 @@ class NotionHistoryConnector:
|
|||
self._notion_client = AsyncClient(auth=token)
|
||||
return self._notion_client
|
||||
|
||||
async def _api_call_with_retry(
|
||||
self,
|
||||
api_func: Callable[..., Awaitable[T]],
|
||||
*args: Any,
|
||||
on_retry: RetryCallbackType | None = None,
|
||||
**kwargs: Any,
|
||||
) -> T:
|
||||
"""
|
||||
Execute Notion API call with retry logic and exponential backoff.
|
||||
|
||||
Handles retryable errors per Notion API documentation:
|
||||
- 429 rate_limited: Uses Retry-After header value
|
||||
- 500 internal_server_error: Retries with exponential backoff
|
||||
- 502 bad_gateway: Retries with exponential backoff
|
||||
- 503 service_unavailable: Retries with exponential backoff
|
||||
- 504 gateway_timeout: Retries with exponential backoff
|
||||
|
||||
Args:
|
||||
api_func: The async Notion API function to call
|
||||
*args: Positional arguments to pass to the API function
|
||||
on_retry: Optional callback to notify about retry progress.
|
||||
Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds)
|
||||
retry_reason is one of: 'rate_limit', 'server_error', 'timeout'
|
||||
**kwargs: Keyword arguments to pass to the API function
|
||||
|
||||
Returns:
|
||||
The result from the API call
|
||||
|
||||
Raises:
|
||||
APIResponseError: If all retries are exhausted or error is not retryable
|
||||
"""
|
||||
last_exception: APIResponseError | None = None
|
||||
retry_delay = BASE_RETRY_DELAY
|
||||
|
||||
for attempt in range(MAX_RETRIES):
|
||||
try:
|
||||
return await api_func(*args, **kwargs)
|
||||
|
||||
except APIResponseError as e:
|
||||
last_exception = e
|
||||
|
||||
# Check if this error is retryable
|
||||
if e.status not in RETRYABLE_STATUS_CODES:
|
||||
# Not retryable (e.g., 400, 401, 403, 404) - raise immediately
|
||||
raise
|
||||
|
||||
# Check if we've exhausted retries
|
||||
if attempt == MAX_RETRIES - 1:
|
||||
logger.error(
|
||||
f"Notion API call failed after {MAX_RETRIES} retries. "
|
||||
f"Last error: {e.status} {e.code}"
|
||||
)
|
||||
raise
|
||||
|
||||
# Determine retry reason and wait time based on status code
|
||||
if e.status == 429:
|
||||
# Rate limited - use Retry-After header if available
|
||||
retry_reason = "rate_limit"
|
||||
retry_after = e.headers.get("Retry-After") if e.headers else None
|
||||
if retry_after:
|
||||
try:
|
||||
wait_time = float(retry_after)
|
||||
except (ValueError, TypeError):
|
||||
wait_time = retry_delay
|
||||
else:
|
||||
wait_time = retry_delay
|
||||
logger.warning(
|
||||
f"Notion API rate limited (429). "
|
||||
f"Waiting {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
|
||||
)
|
||||
elif e.status == 504:
|
||||
# Gateway timeout
|
||||
retry_reason = "timeout"
|
||||
wait_time = min(retry_delay, MAX_RETRY_DELAY)
|
||||
logger.warning(
|
||||
f"Notion API timeout ({e.status}). "
|
||||
f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
|
||||
)
|
||||
else:
|
||||
# Server error (500/502/503) - use exponential backoff
|
||||
retry_reason = "server_error"
|
||||
wait_time = min(retry_delay, MAX_RETRY_DELAY)
|
||||
logger.warning(
|
||||
f"Notion API error {e.status} ({e.code}). "
|
||||
f"Retrying in {wait_time}s. Attempt {attempt + 1}/{MAX_RETRIES}"
|
||||
)
|
||||
|
||||
# Notify about retry via callback (for user notifications)
|
||||
# Call before sleeping so user sees the message while we wait
|
||||
if on_retry:
|
||||
try:
|
||||
await on_retry(
|
||||
retry_reason,
|
||||
attempt + 1, # 1-based for display
|
||||
MAX_RETRIES,
|
||||
wait_time,
|
||||
)
|
||||
except Exception as callback_error:
|
||||
# Don't let callback errors break the retry logic
|
||||
logger.warning(
|
||||
f"Retry callback failed: {callback_error}"
|
||||
)
|
||||
|
||||
# Wait before retrying
|
||||
await asyncio.sleep(wait_time)
|
||||
|
||||
# Exponential backoff for next attempt
|
||||
retry_delay = min(retry_delay * 2, MAX_RETRY_DELAY)
|
||||
|
||||
# This should not be reached, but just in case
|
||||
if last_exception:
|
||||
raise last_exception
|
||||
raise RuntimeError("Unexpected state in retry logic")
|
||||
|
||||
async def close(self):
|
||||
"""Close the async client connection."""
|
||||
if self._notion_client:
|
||||
await self._notion_client.aclose()
|
||||
self._notion_client = None
|
||||
|
||||
def get_pages_with_skipped_content(self) -> list[str]:
|
||||
"""
|
||||
Get list of page titles that had unsupported content skipped.
|
||||
|
||||
Returns:
|
||||
List of page titles with skipped content
|
||||
"""
|
||||
return self._pages_with_skipped_content
|
||||
|
||||
def get_skipped_content_count(self) -> int:
|
||||
"""
|
||||
Get count of pages that had unsupported content skipped.
|
||||
|
||||
Returns:
|
||||
Number of pages with skipped content
|
||||
"""
|
||||
return len(self._pages_with_skipped_content)
|
||||
|
||||
def is_using_legacy_token(self) -> bool:
|
||||
"""
|
||||
Check if connector is using legacy integration token format.
|
||||
|
||||
Returns:
|
||||
True if using legacy NOTION_INTEGRATION_TOKEN, False if using OAuth
|
||||
"""
|
||||
return self._using_legacy_token
|
||||
|
||||
def _record_skipped_content(self, page_title: str):
|
||||
"""
|
||||
Record that a page had unsupported content skipped.
|
||||
|
||||
Args:
|
||||
page_title: Title of the page with skipped content
|
||||
"""
|
||||
if page_title not in self._pages_with_skipped_content:
|
||||
self._pages_with_skipped_content.append(page_title)
|
||||
|
||||
async def __aenter__(self):
|
||||
"""Async context manager entry."""
|
||||
return self
|
||||
|
|
@ -186,7 +437,7 @@ class NotionHistoryConnector:
|
|||
|
||||
# Build the filter for the search
|
||||
# Note: Notion API requires specific filter structure
|
||||
search_params = {}
|
||||
search_params: dict[str, Any] = {}
|
||||
|
||||
# Filter for pages only (not databases)
|
||||
search_params["filter"] = {"value": "page", "property": "object"}
|
||||
|
|
@ -214,29 +465,53 @@ class NotionHistoryConnector:
|
|||
cursor = None
|
||||
|
||||
while has_more:
|
||||
if cursor:
|
||||
search_params["start_cursor"] = cursor
|
||||
try:
|
||||
if cursor:
|
||||
search_params["start_cursor"] = cursor
|
||||
|
||||
search_results = await notion.search(**search_params)
|
||||
# Use retry wrapper for search API call
|
||||
search_results = await self._api_call_with_retry(
|
||||
notion.search, on_retry=self._on_retry_callback, **search_params
|
||||
)
|
||||
|
||||
pages.extend(search_results["results"])
|
||||
has_more = search_results.get("has_more", False)
|
||||
pages.extend(search_results["results"])
|
||||
has_more = search_results.get("has_more", False)
|
||||
|
||||
if has_more:
|
||||
cursor = search_results.get("next_cursor")
|
||||
if has_more:
|
||||
cursor = search_results.get("next_cursor")
|
||||
|
||||
except APIResponseError as e:
|
||||
error_message = str(e)
|
||||
# Handle invalid cursor - stop pagination gracefully
|
||||
if "start_cursor provided is invalid" in error_message:
|
||||
logger.warning(
|
||||
f"Invalid pagination cursor encountered. "
|
||||
f"Continuing with {len(pages)} pages already fetched."
|
||||
)
|
||||
has_more = False
|
||||
continue
|
||||
# Re-raise other errors
|
||||
raise
|
||||
|
||||
all_page_data = []
|
||||
|
||||
for page in pages:
|
||||
page_id = page["id"]
|
||||
page_title = self.get_page_title(page)
|
||||
|
||||
# Get detailed page information
|
||||
page_content = await self.get_page_content(page_id)
|
||||
# Get detailed page information (pass title for skip tracking)
|
||||
page_content, had_skipped_content = await self.get_page_content(
|
||||
page_id, page_title
|
||||
)
|
||||
|
||||
# Record if this page had skipped content
|
||||
if had_skipped_content:
|
||||
self._record_skipped_content(page_title)
|
||||
|
||||
all_page_data.append(
|
||||
{
|
||||
"page_id": page_id,
|
||||
"title": self.get_page_title(page),
|
||||
"title": page_title,
|
||||
"content": page_content,
|
||||
}
|
||||
)
|
||||
|
|
@ -265,46 +540,93 @@ class NotionHistoryConnector:
|
|||
# If no title found, return the page ID as fallback
|
||||
return f"Untitled page ({page['id']})"
|
||||
|
||||
async def get_page_content(self, page_id):
|
||||
async def get_page_content(
|
||||
self, page_id: str, page_title: str | None = None
|
||||
) -> tuple[list, bool]:
|
||||
"""
|
||||
Fetches the content (blocks) of a specific page.
|
||||
|
||||
Args:
|
||||
page_id (str): The ID of the page to fetch
|
||||
page_title (str, optional): Title of the page (for logging)
|
||||
|
||||
Returns:
|
||||
list: List of processed blocks from the page
|
||||
tuple: (List of processed blocks, bool indicating if content was skipped)
|
||||
"""
|
||||
notion = await self._get_client()
|
||||
|
||||
blocks = []
|
||||
has_more = True
|
||||
cursor = None
|
||||
skipped_blocks_count = 0
|
||||
had_skipped_content = False
|
||||
|
||||
# Paginate through all blocks
|
||||
while has_more:
|
||||
if cursor:
|
||||
response = await notion.blocks.children.list(
|
||||
block_id=page_id, start_cursor=cursor
|
||||
)
|
||||
else:
|
||||
response = await notion.blocks.children.list(block_id=page_id)
|
||||
try:
|
||||
# Use retry wrapper for blocks.children.list API call
|
||||
if cursor:
|
||||
response = await self._api_call_with_retry(
|
||||
notion.blocks.children.list,
|
||||
on_retry=self._on_retry_callback,
|
||||
block_id=page_id,
|
||||
start_cursor=cursor,
|
||||
)
|
||||
else:
|
||||
response = await self._api_call_with_retry(
|
||||
notion.blocks.children.list,
|
||||
on_retry=self._on_retry_callback,
|
||||
block_id=page_id,
|
||||
)
|
||||
|
||||
blocks.extend(response["results"])
|
||||
has_more = response["has_more"]
|
||||
blocks.extend(response["results"])
|
||||
has_more = response["has_more"]
|
||||
|
||||
if has_more:
|
||||
cursor = response["next_cursor"]
|
||||
if has_more:
|
||||
cursor = response["next_cursor"]
|
||||
|
||||
except APIResponseError as e:
|
||||
error_message = str(e)
|
||||
# Check if this is an unsupported block type error
|
||||
if any(
|
||||
err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS
|
||||
):
|
||||
logger.warning(
|
||||
f"Skipping page blocks due to unsupported block type in page {page_id}: {error_message}"
|
||||
)
|
||||
skipped_blocks_count += 1
|
||||
had_skipped_content = True
|
||||
# If we haven't fetched any blocks yet, return empty
|
||||
# If we have some blocks, continue with what we have
|
||||
has_more = False
|
||||
continue
|
||||
elif "Could not find block" in error_message:
|
||||
logger.warning(
|
||||
f"Block not found in page {page_id}, continuing with available blocks: {error_message}"
|
||||
)
|
||||
has_more = False
|
||||
continue
|
||||
# Re-raise other API errors (after retry exhaustion)
|
||||
raise
|
||||
|
||||
if skipped_blocks_count > 0:
|
||||
logger.info(
|
||||
f"Page {page_id}: Skipped {skipped_blocks_count} unsupported block sections, "
|
||||
f"successfully processed {len(blocks)} blocks"
|
||||
)
|
||||
|
||||
# Process nested blocks recursively
|
||||
processed_blocks = []
|
||||
for block in blocks:
|
||||
processed_block = await self.process_block(block)
|
||||
processed_blocks.append(processed_block)
|
||||
processed_block, block_had_skips = await self.process_block(block)
|
||||
if processed_block: # Only add if block was processed successfully
|
||||
processed_blocks.append(processed_block)
|
||||
if block_had_skips:
|
||||
had_skipped_content = True
|
||||
|
||||
return processed_blocks
|
||||
return processed_blocks, had_skipped_content
|
||||
|
||||
async def process_block(self, block):
|
||||
async def process_block(self, block) -> tuple[dict | None, bool]:
|
||||
"""
|
||||
Processes a block and recursively fetches any child blocks.
|
||||
|
||||
|
|
@ -312,12 +634,28 @@ class NotionHistoryConnector:
|
|||
block (dict): The block to process
|
||||
|
||||
Returns:
|
||||
dict: Processed block with content and children
|
||||
tuple: (Processed block dict or None, bool indicating if content was skipped)
|
||||
"""
|
||||
notion = await self._get_client()
|
||||
|
||||
block_id = block["id"]
|
||||
block_type = block["type"]
|
||||
had_skipped_content = False
|
||||
|
||||
# Check if this is a known unsupported block type before processing
|
||||
if block_type in UNSUPPORTED_BLOCK_TYPES:
|
||||
logger.debug(
|
||||
f"Skipping unsupported block type: {block_type} (block_id: {block_id})"
|
||||
)
|
||||
return (
|
||||
{
|
||||
"id": block_id,
|
||||
"type": block_type,
|
||||
"content": f"[{block_type} block - not supported by Notion API]",
|
||||
"children": [],
|
||||
},
|
||||
True, # Content was skipped
|
||||
)
|
||||
|
||||
# Extract block content based on its type
|
||||
content = self.extract_block_content(block)
|
||||
|
|
@ -327,17 +665,50 @@ class NotionHistoryConnector:
|
|||
child_blocks = []
|
||||
|
||||
if has_children:
|
||||
# Fetch and process child blocks
|
||||
children_response = await notion.blocks.children.list(block_id=block_id)
|
||||
for child_block in children_response["results"]:
|
||||
child_blocks.append(await self.process_block(child_block))
|
||||
try:
|
||||
# Use retry wrapper for blocks.children.list API call
|
||||
children_response = await self._api_call_with_retry(
|
||||
notion.blocks.children.list,
|
||||
on_retry=self._on_retry_callback,
|
||||
block_id=block_id,
|
||||
)
|
||||
for child_block in children_response["results"]:
|
||||
processed_child, child_had_skips = await self.process_block(
|
||||
child_block
|
||||
)
|
||||
if processed_child:
|
||||
child_blocks.append(processed_child)
|
||||
if child_had_skips:
|
||||
had_skipped_content = True
|
||||
except APIResponseError as e:
|
||||
error_message = str(e)
|
||||
# Check if this is an unsupported block type error
|
||||
if any(
|
||||
err in error_message for err in UNSUPPORTED_BLOCK_TYPE_ERRORS
|
||||
):
|
||||
logger.warning(
|
||||
f"Skipping children of block {block_id} due to unsupported block type: {error_message}"
|
||||
)
|
||||
had_skipped_content = True
|
||||
# Continue without children instead of failing
|
||||
elif "Could not find block" in error_message:
|
||||
logger.warning(
|
||||
f"Block {block_id} children not accessible, skipping: {error_message}"
|
||||
)
|
||||
# Continue without children
|
||||
else:
|
||||
# Re-raise other API errors (after retry exhaustion)
|
||||
raise
|
||||
|
||||
return {
|
||||
"id": block_id,
|
||||
"type": block_type,
|
||||
"content": content,
|
||||
"children": child_blocks,
|
||||
}
|
||||
return (
|
||||
{
|
||||
"id": block_id,
|
||||
"type": block_type,
|
||||
"content": content,
|
||||
"children": child_blocks,
|
||||
},
|
||||
had_skipped_content,
|
||||
)
|
||||
|
||||
def extract_block_content(self, block):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -1149,6 +1149,7 @@ async def _run_indexing_with_notifications(
|
|||
end_date: str,
|
||||
indexing_function,
|
||||
update_timestamp_func=None,
|
||||
supports_retry_callback: bool = False,
|
||||
):
|
||||
"""
|
||||
Generic helper to run indexing with real-time notifications.
|
||||
|
|
@ -1162,10 +1163,14 @@ async def _run_indexing_with_notifications(
|
|||
end_date: End date for indexing
|
||||
indexing_function: Async function that performs the indexing
|
||||
update_timestamp_func: Optional function to update connector timestamp
|
||||
supports_retry_callback: Whether the indexing function supports on_retry_callback
|
||||
"""
|
||||
from uuid import UUID
|
||||
|
||||
notification = None
|
||||
# Track indexed count for retry notifications
|
||||
current_indexed_count = 0
|
||||
|
||||
try:
|
||||
# Get connector info for notification
|
||||
connector_result = await session.execute(
|
||||
|
|
@ -1199,17 +1204,47 @@ async def _run_indexing_with_notifications(
|
|||
stage="fetching",
|
||||
)
|
||||
|
||||
# Create retry callback for connectors that support it
|
||||
async def on_retry_callback(
|
||||
retry_reason: str, attempt: int, max_attempts: int, wait_seconds: float
|
||||
) -> None:
|
||||
"""Callback to update notification during API retries (rate limits, etc.)"""
|
||||
nonlocal notification
|
||||
if notification:
|
||||
try:
|
||||
await session.refresh(notification)
|
||||
await NotificationService.connector_indexing.notify_retry_progress(
|
||||
session=session,
|
||||
notification=notification,
|
||||
indexed_count=current_indexed_count,
|
||||
retry_reason=retry_reason,
|
||||
attempt=attempt,
|
||||
max_attempts=max_attempts,
|
||||
wait_seconds=wait_seconds,
|
||||
)
|
||||
await session.commit()
|
||||
except Exception as e:
|
||||
# Don't let notification errors break the indexing
|
||||
logger.warning(f"Failed to update retry notification: {e}")
|
||||
|
||||
# Build kwargs for indexing function
|
||||
indexing_kwargs = {
|
||||
"session": session,
|
||||
"connector_id": connector_id,
|
||||
"search_space_id": search_space_id,
|
||||
"user_id": user_id,
|
||||
"start_date": start_date,
|
||||
"end_date": end_date,
|
||||
"update_last_indexed": False,
|
||||
}
|
||||
|
||||
# Add retry callback for connectors that support it
|
||||
if supports_retry_callback:
|
||||
indexing_kwargs["on_retry_callback"] = on_retry_callback
|
||||
|
||||
# Run the indexing function
|
||||
# Some indexers return (indexed, error), others return (indexed, skipped, error)
|
||||
result = await indexing_function(
|
||||
session=session,
|
||||
connector_id=connector_id,
|
||||
search_space_id=search_space_id,
|
||||
user_id=user_id,
|
||||
start_date=start_date,
|
||||
end_date=end_date,
|
||||
update_last_indexed=False,
|
||||
)
|
||||
result = await indexing_function(**indexing_kwargs)
|
||||
|
||||
# Handle both 2-tuple and 3-tuple returns for backwards compatibility
|
||||
if len(result) == 3:
|
||||
|
|
@ -1290,8 +1325,15 @@ async def _run_indexing_with_notifications(
|
|||
"no " in error_or_warning_lower
|
||||
and "found" in error_or_warning_lower
|
||||
)
|
||||
# Informational warnings - sync succeeded but some content couldn't be synced
|
||||
# These are NOT errors, just notifications about API limitations or recommendations
|
||||
is_info_warning = (
|
||||
"couldn't be synced" in error_or_warning_lower
|
||||
or "using legacy token" in error_or_warning_lower
|
||||
or "(api limitation)" in error_or_warning_lower
|
||||
)
|
||||
|
||||
if is_duplicate_warning or is_empty_result:
|
||||
if is_duplicate_warning or is_empty_result or is_info_warning:
|
||||
# These are success cases - sync worked, just found nothing new
|
||||
logger.info(f"Indexing completed successfully: {error_or_warning}")
|
||||
# Still update timestamp so ElectricSQL syncs and clears "Syncing" UI
|
||||
|
|
@ -1396,6 +1438,7 @@ async def run_notion_indexing_with_new_session(
|
|||
end_date=end_date,
|
||||
indexing_function=index_notion_pages,
|
||||
update_timestamp_func=_update_connector_timestamp_by_id,
|
||||
supports_retry_callback=True, # Notion connector supports retry notifications
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -1427,6 +1470,7 @@ async def run_notion_indexing(
|
|||
end_date=end_date,
|
||||
indexing_function=index_notion_pages,
|
||||
update_timestamp_func=_update_connector_timestamp_by_id,
|
||||
supports_retry_callback=True, # Notion connector supports retry notifications
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -329,6 +329,92 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler):
|
|||
metadata_updates=metadata_updates,
|
||||
)
|
||||
|
||||
async def notify_retry_progress(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
notification: Notification,
|
||||
indexed_count: int,
|
||||
retry_reason: str,
|
||||
attempt: int,
|
||||
max_attempts: int,
|
||||
wait_seconds: float | None = None,
|
||||
service_name: str | None = None,
|
||||
) -> Notification:
|
||||
"""
|
||||
Update notification when a connector is retrying due to rate limits or errors.
|
||||
|
||||
This method provides user-friendly feedback when external service limitations
|
||||
(rate limits, temporary outages) cause delays. Users see that the delay is
|
||||
not our fault and the sync is still progressing.
|
||||
|
||||
This method can be used by ANY connector (Notion, Slack, Airtable, etc.)
|
||||
when they hit rate limits or transient errors.
|
||||
|
||||
Args:
|
||||
session: Database session
|
||||
notification: Notification to update
|
||||
indexed_count: Number of items indexed so far
|
||||
retry_reason: Reason for retry ('rate_limit', 'server_error', 'timeout')
|
||||
attempt: Current retry attempt number (1-based)
|
||||
max_attempts: Maximum number of retry attempts
|
||||
wait_seconds: Seconds to wait before retry (optional, for display)
|
||||
service_name: Name of the external service (e.g., 'Notion', 'Slack')
|
||||
If not provided, extracts from notification metadata
|
||||
|
||||
Returns:
|
||||
Updated notification
|
||||
"""
|
||||
# Get service name from notification if not provided
|
||||
if not service_name:
|
||||
service_name = notification.notification_metadata.get(
|
||||
"connector_name", "Service"
|
||||
)
|
||||
# Extract just the service name if it's "Notion - My Workspace"
|
||||
if " - " in service_name:
|
||||
service_name = service_name.split(" - ")[0]
|
||||
|
||||
# User-friendly messages for different retry reasons
|
||||
# These make it clear the delay is due to the external service, not SurfSense
|
||||
retry_messages = {
|
||||
"rate_limit": f"{service_name} rate limit reached",
|
||||
"server_error": f"{service_name} is slow to respond",
|
||||
"timeout": f"{service_name} took too long",
|
||||
"temporary_error": f"{service_name} temporarily unavailable",
|
||||
}
|
||||
|
||||
base_message = retry_messages.get(
|
||||
retry_reason, f"Waiting for {service_name}"
|
||||
)
|
||||
|
||||
# Add wait time and progress info
|
||||
if wait_seconds and wait_seconds > 5:
|
||||
# Only show wait time if it's significant
|
||||
message = f"{base_message}. Retrying in {int(wait_seconds)}s..."
|
||||
else:
|
||||
message = f"{base_message}. Retrying..."
|
||||
|
||||
# Add progress count if we have any
|
||||
if indexed_count > 0:
|
||||
item_text = "item" if indexed_count == 1 else "items"
|
||||
message = f"{message} ({indexed_count} {item_text} synced so far)"
|
||||
|
||||
metadata_updates = {
|
||||
"indexed_count": indexed_count,
|
||||
"sync_stage": "waiting_retry",
|
||||
"retry_attempt": attempt,
|
||||
"retry_max_attempts": max_attempts,
|
||||
"retry_reason": retry_reason,
|
||||
"retry_wait_seconds": wait_seconds,
|
||||
}
|
||||
|
||||
return await self.update_notification(
|
||||
session=session,
|
||||
notification=notification,
|
||||
message=message,
|
||||
status="in_progress",
|
||||
metadata_updates=metadata_updates,
|
||||
)
|
||||
|
||||
async def notify_indexing_completed(
|
||||
self,
|
||||
session: AsyncSession,
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
Notion connector indexer.
|
||||
"""
|
||||
|
||||
from collections.abc import Awaitable, Callable
|
||||
from datetime import datetime
|
||||
|
||||
from sqlalchemy.exc import SQLAlchemyError
|
||||
|
|
@ -29,6 +30,10 @@ from .base import (
|
|||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
# Type alias for retry callback
|
||||
# Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None
|
||||
RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]]
|
||||
|
||||
|
||||
async def index_notion_pages(
|
||||
session: AsyncSession,
|
||||
|
|
@ -38,6 +43,7 @@ async def index_notion_pages(
|
|||
start_date: str | None = None,
|
||||
end_date: str | None = None,
|
||||
update_last_indexed: bool = True,
|
||||
on_retry_callback: RetryCallbackType | None = None,
|
||||
) -> tuple[int, str | None]:
|
||||
"""
|
||||
Index Notion pages from all accessible pages.
|
||||
|
|
@ -50,6 +56,9 @@ async def index_notion_pages(
|
|||
start_date: Start date for indexing (YYYY-MM-DD format)
|
||||
end_date: End date for indexing (YYYY-MM-DD format)
|
||||
update_last_indexed: Whether to update the last_indexed_at timestamp (default: True)
|
||||
on_retry_callback: Optional callback for retry progress notifications.
|
||||
Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds)
|
||||
retry_reason is one of: 'rate_limit', 'server_error', 'timeout'
|
||||
|
||||
Returns:
|
||||
Tuple containing (number of documents indexed, error message or None)
|
||||
|
|
@ -139,6 +148,10 @@ async def index_notion_pages(
|
|||
session=session, connector_id=connector_id
|
||||
)
|
||||
|
||||
# Set retry callback if provided (for user notifications during rate limits)
|
||||
if on_retry_callback:
|
||||
notion_client.set_retry_callback(on_retry_callback)
|
||||
|
||||
logger.info(f"Fetching Notion pages from {start_date_iso} to {end_date_iso}")
|
||||
|
||||
await task_logger.log_task_progress(
|
||||
|
|
@ -157,6 +170,20 @@ async def index_notion_pages(
|
|||
start_date=start_date_iso, end_date=end_date_iso
|
||||
)
|
||||
logger.info(f"Found {len(pages)} Notion pages")
|
||||
|
||||
# Get count of pages that had unsupported content skipped
|
||||
pages_with_skipped_content = notion_client.get_skipped_content_count()
|
||||
if pages_with_skipped_content > 0:
|
||||
logger.info(
|
||||
f"{pages_with_skipped_content} pages had Notion AI content skipped (not available via API)"
|
||||
)
|
||||
|
||||
# Check if using legacy integration token and log warning
|
||||
if notion_client.is_using_legacy_token():
|
||||
logger.warning(
|
||||
f"Connector {connector_id} is using legacy integration token. "
|
||||
"Recommend reconnecting with OAuth."
|
||||
)
|
||||
except Exception as e:
|
||||
await task_logger.log_task_failure(
|
||||
log_entry,
|
||||
|
|
@ -171,12 +198,13 @@ async def index_notion_pages(
|
|||
if not pages:
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
f"No Notion pages found for connector {connector_id}",
|
||||
f"No Notion pages found for connector {connector_id}. "
|
||||
"Ensure pages are shared with the Notion integration.",
|
||||
{"pages_found": 0},
|
||||
)
|
||||
logger.info("No Notion pages found to index")
|
||||
await notion_client.close()
|
||||
return 0, "No Notion pages found"
|
||||
return 0, None # Success with 0 pages, not an error
|
||||
|
||||
# Track the number of documents indexed
|
||||
documents_indexed = 0
|
||||
|
|
@ -454,13 +482,23 @@ async def index_notion_pages(
|
|||
logger.info(f"Final commit: Total {documents_indexed} documents processed")
|
||||
await session.commit()
|
||||
|
||||
# Prepare result message
|
||||
# Get final count of pages with skipped Notion AI content
|
||||
pages_with_skipped_ai_content = notion_client.get_skipped_content_count()
|
||||
|
||||
# Prepare result message with user-friendly notification about skipped content
|
||||
result_message = None
|
||||
if skipped_pages:
|
||||
result_message = f"Processed {total_processed} pages. Skipped {len(skipped_pages)} pages: {', '.join(skipped_pages)}"
|
||||
else:
|
||||
result_message = f"Processed {total_processed} pages."
|
||||
|
||||
# Add user-friendly message about skipped Notion AI content
|
||||
if pages_with_skipped_ai_content > 0:
|
||||
result_message += (
|
||||
" Audio transcriptions and AI summaries from Notion aren't accessible "
|
||||
"via their API - all other content was saved."
|
||||
)
|
||||
|
||||
# Log success
|
||||
await task_logger.log_task_success(
|
||||
log_entry,
|
||||
|
|
@ -470,6 +508,7 @@ async def index_notion_pages(
|
|||
"documents_indexed": documents_indexed,
|
||||
"documents_skipped": documents_skipped,
|
||||
"skipped_pages_count": len(skipped_pages),
|
||||
"pages_with_skipped_ai_content": pages_with_skipped_ai_content,
|
||||
"result_message": result_message,
|
||||
},
|
||||
)
|
||||
|
|
@ -481,10 +520,26 @@ async def index_notion_pages(
|
|||
# Clean up the async client
|
||||
await notion_client.close()
|
||||
|
||||
# Build user-friendly notification messages
|
||||
# This will be shown in the notification to inform users
|
||||
notification_parts = []
|
||||
|
||||
if pages_with_skipped_ai_content > 0:
|
||||
notification_parts.append(
|
||||
"Some Notion AI content couldn't be synced (API limitation)"
|
||||
)
|
||||
|
||||
if notion_client.is_using_legacy_token():
|
||||
notification_parts.append(
|
||||
"Using legacy token. Reconnect with OAuth for better reliability."
|
||||
)
|
||||
|
||||
user_notification_message = " ".join(notification_parts) if notification_parts else None
|
||||
|
||||
return (
|
||||
total_processed,
|
||||
None,
|
||||
) # Return None on success (result_message is for logging only)
|
||||
user_notification_message,
|
||||
)
|
||||
|
||||
except SQLAlchemyError as db_error:
|
||||
await session.rollback()
|
||||
|
|
|
|||
|
|
@ -218,7 +218,7 @@ export const IndexingConfigurationView: FC<IndexingConfigurationViewProps> = ({
|
|||
{isStartingIndexing ? (
|
||||
<>
|
||||
<Spinner size="sm" className="mr-2" />
|
||||
Starting...
|
||||
Starting
|
||||
</>
|
||||
) : (
|
||||
"Start Indexing"
|
||||
|
|
|
|||
|
|
@ -66,6 +66,29 @@ Click **Save** to apply the capabilities.
|
|||
|
||||
---
|
||||
|
||||
## Limitations & Unsupported Content
|
||||
|
||||
Notion's API has limitations on certain block types that cannot be retrieved. SurfSense will automatically skip these unsupported blocks and continue syncing all other content.
|
||||
|
||||
### Unsupported Block Types
|
||||
|
||||
The following Notion features are **not accessible via the Notion API** and will be skipped during sync:
|
||||
|
||||
- **Transcription blocks** - Audio/video transcriptions from Notion AI
|
||||
- **AI blocks** - AI-generated content blocks
|
||||
|
||||
### Learn More
|
||||
|
||||
The Notion API only supports specific block types for retrieval. The official list of **supported block types** is documented in Notion's Block reference:
|
||||
|
||||
- **[Block Object Reference](https://developers.notion.com/reference/block)** - Official documentation listing all supported block types. Any block type not listed here (such as `transcription` and `ai_block`) is not accessible via the Notion API.
|
||||
|
||||
For additional information:
|
||||
- [Working with Page Content](https://developers.notion.com/docs/working-with-page-content) - Guide on how the Notion API handles page content
|
||||
- [Notion API Reference](https://developers.notion.com/reference) - Complete API documentation
|
||||
|
||||
---
|
||||
|
||||
## Running SurfSense with Notion Connector
|
||||
|
||||
Add the Notion environment variables to your Docker run command:
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue