diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 0a22c20c2..5161fb569 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -37,18 +37,30 @@ from .base import ( from .markdown_processor import add_received_markdown_file_document # Constants for LlamaCloud retry configuration -LLAMACLOUD_MAX_RETRIES = 3 -LLAMACLOUD_BASE_DELAY = 5 # Base delay in seconds for exponential backoff +LLAMACLOUD_MAX_RETRIES = 5 # Increased from 3 for large file resilience +LLAMACLOUD_BASE_DELAY = 10 # Base delay in seconds for exponential backoff +LLAMACLOUD_MAX_DELAY = 120 # Maximum delay between retries (2 minutes) LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( ssl.SSLError, httpx.ConnectError, httpx.ConnectTimeout, httpx.ReadTimeout, httpx.WriteTimeout, + httpx.RemoteProtocolError, + httpx.LocalProtocolError, ConnectionError, + ConnectionResetError, TimeoutError, + OSError, # Catches various network-level errors ) +# Timeout calculation constants +UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections) +MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file +MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files +BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing +PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing + def get_google_drive_unique_identifier( connector: dict | None, @@ -204,6 +216,48 @@ async def find_existing_document_with_migration( return existing_document +def calculate_upload_timeout(file_size_bytes: int) -> float: + """ + Calculate appropriate upload timeout based on file size. + + Assumes a conservative slow connection speed to handle worst-case scenarios. + + Args: + file_size_bytes: Size of the file in bytes + + Returns: + Timeout in seconds + """ + # Calculate time needed at slow connection speed + # Add 50% buffer for network variability and SSL overhead + estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 + + # Clamp to reasonable bounds + return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) + + +def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: + """ + Calculate job processing timeout based on page count and file size. + + Args: + estimated_pages: Estimated number of pages + file_size_bytes: Size of the file in bytes + + Returns: + Timeout in seconds + """ + # Base timeout + time per page + page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) + + # Also consider file size (large images take longer to process) + # ~1 minute per 10MB of file size + size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 + + # Use the larger of the two estimates + return max(page_based_timeout, size_based_timeout) + + async def parse_with_llamacloud_retry( file_path: str, estimated_pages: int, @@ -213,6 +267,9 @@ async def parse_with_llamacloud_retry( """ Parse a file with LlamaCloud with retry logic for transient SSL/connection errors. + Uses dynamic timeout calculations based on file size and page count to handle + very large files reliably. + Args: file_path: Path to the file to parse estimated_pages: Estimated number of pages for timeout calculation @@ -225,25 +282,37 @@ async def parse_with_llamacloud_retry( Raises: Exception: If all retries fail """ + import os + import random + from llama_cloud_services import LlamaParse from llama_cloud_services.parse.utils import ResultType - # Calculate timeouts based on estimated pages - # Base timeout of 300 seconds + 30 seconds per page for large documents - base_timeout = 300 - per_page_timeout = 30 - job_timeout = base_timeout + (estimated_pages * per_page_timeout) - - # Create custom httpx client with larger timeouts for file uploads - # The SSL error often occurs during large file uploads, so we need generous timeouts + # Get file size for timeout calculations + file_size_bytes = os.path.getsize(file_path) + file_size_mb = file_size_bytes / (1024 * 1024) + + # Calculate dynamic timeouts based on file size and page count + upload_timeout = calculate_upload_timeout(file_size_bytes) + job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) + + # HTTP client timeouts - scaled based on file size + # Write timeout is critical for large file uploads custom_timeout = httpx.Timeout( - connect=60.0, # 60 seconds to establish connection - read=300.0, # 5 minutes to read response - write=300.0, # 5 minutes to write/upload (important for large files) - pool=60.0, # 60 seconds to acquire connection from pool + connect=120.0, # 2 minutes to establish connection (handles slow DNS, etc.) + read=upload_timeout, # Dynamic based on file size + write=upload_timeout, # Dynamic based on file size (upload time) + pool=120.0, # 2 minutes to acquire connection from pool + ) + + logging.info( + f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " + f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " + f"job_timeout={job_timeout:.0f}s" ) last_exception = None + attempt_errors = [] for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): try: @@ -257,46 +326,67 @@ async def parse_with_llamacloud_retry( language="en", result_type=ResultType.MD, # Timeout settings for large files - max_timeout=max(2000, job_timeout), # Overall max timeout + max_timeout=int(max(2000, job_timeout + upload_timeout)), job_timeout_in_seconds=job_timeout, - job_timeout_extra_time_per_page_in_seconds=per_page_timeout, + job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, # Use our custom client with larger timeouts custom_client=custom_client, ) # Parse the file asynchronously result = await parser.aparse(file_path) + + # Success - log if we had previous failures + if attempt > 1: + logging.info( + f"LlamaCloud upload succeeded on attempt {attempt} after " + f"{len(attempt_errors)} failures" + ) + return result except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: last_exception = e error_type = type(e).__name__ + error_msg = str(e)[:200] + attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") if attempt < LLAMACLOUD_MAX_RETRIES: - # Calculate exponential backoff delay - delay = LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)) + # Calculate exponential backoff with jitter + # Base delay doubles each attempt, capped at max delay + base_delay = min( + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), + LLAMACLOUD_MAX_DELAY + ) + # Add random jitter (±25%) to prevent thundering herd + jitter = base_delay * 0.25 * (2 * random.random() - 1) + delay = base_delay + jitter if task_logger and log_entry: await task_logger.log_task_progress( log_entry, - f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay}s", + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay:.0f}s", { "error_type": error_type, - "error_message": str(e)[:200], + "error_message": error_msg, "attempt": attempt, "retry_delay": delay, + "file_size_mb": round(file_size_mb, 1), + "upload_timeout": upload_timeout, }, ) else: logging.warning( - f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): {error_type}. " - f"Retrying in {delay}s..." + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " + f"{error_type}. File: {file_size_mb:.1f}MB. Retrying in {delay:.0f}s..." ) await asyncio.sleep(delay) else: logging.error( - f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts: {error_type} - {e}" + f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts. " + f"File size: {file_size_mb:.1f}MB, Pages: {estimated_pages}. " + f"Errors: {'; '.join(attempt_errors)}" ) except Exception: @@ -304,7 +394,10 @@ async def parse_with_llamacloud_retry( raise # All retries exhausted - raise last_exception or RuntimeError("LlamaCloud parsing failed after all retries") + raise last_exception or RuntimeError( + f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " + f"File size: {file_size_mb:.1f}MB" + ) async def add_received_file_document_using_unstructured( diff --git a/surfsense_web/package.json b/surfsense_web/package.json index 17dee6251..9c7d25378 100644 --- a/surfsense_web/package.json +++ b/surfsense_web/package.json @@ -86,8 +86,8 @@ "next-themes": "^0.4.6", "pg": "^8.16.3", "postgres": "^3.4.7", - "posthog-js": "^1.335.3", - "posthog-node": "^5.24.2", + "posthog-js": "^1.335.5", + "posthog-node": "^5.24.3", "react": "^19.2.3", "react-day-picker": "^9.8.1", "react-dom": "^19.2.3",