diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 596cd9830..307e09897 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -2,11 +2,14 @@ File document processors for different ETL services (Unstructured, LlamaCloud, Docling). """ +import asyncio import contextlib import logging +import ssl import warnings from logging import ERROR, getLogger +import httpx from fastapi import HTTPException from langchain_core.documents import Document as LangChainDocument from litellm import atranscription @@ -32,6 +35,123 @@ from .base import ( from .markdown_processor import add_received_markdown_file_document +# Constants for LlamaCloud retry configuration +LLAMACLOUD_MAX_RETRIES = 3 +LLAMACLOUD_BASE_DELAY = 5 # Base delay in seconds for exponential backoff +LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( + ssl.SSLError, + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadTimeout, + httpx.WriteTimeout, + ConnectionError, + TimeoutError, +) + + +async def parse_with_llamacloud_retry( + file_path: str, + estimated_pages: int, + task_logger: TaskLoggingService | None = None, + log_entry: Log | None = None, +): + """ + Parse a file with LlamaCloud with retry logic for transient SSL/connection errors. + + Args: + file_path: Path to the file to parse + estimated_pages: Estimated number of pages for timeout calculation + task_logger: Optional task logger for progress updates + log_entry: Optional log entry for progress updates + + Returns: + LlamaParse result object + + Raises: + Exception: If all retries fail + """ + from llama_cloud_services import LlamaParse + from llama_cloud_services.parse.utils import ResultType + + # Calculate timeouts based on estimated pages + # Base timeout of 300 seconds + 30 seconds per page for large documents + base_timeout = 300 + per_page_timeout = 30 + job_timeout = base_timeout + (estimated_pages * per_page_timeout) + + # Create custom httpx client with larger timeouts for file uploads + # The SSL error often occurs during large file uploads, so we need generous timeouts + custom_timeout = httpx.Timeout( + connect=60.0, # 60 seconds to establish connection + read=300.0, # 5 minutes to read response + write=300.0, # 5 minutes to write/upload (important for large files) + pool=60.0, # 60 seconds to acquire connection from pool + ) + + last_exception = None + + for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): + try: + # Create a fresh httpx client for each attempt + async with httpx.AsyncClient(timeout=custom_timeout) as custom_client: + # Create LlamaParse parser instance with optimized settings + parser = LlamaParse( + api_key=app_config.LLAMA_CLOUD_API_KEY, + num_workers=1, # Use single worker for file processing + verbose=True, + language="en", + result_type=ResultType.MD, + # Timeout settings for large files + max_timeout=max(2000, job_timeout), # Overall max timeout + job_timeout_in_seconds=job_timeout, + job_timeout_extra_time_per_page_in_seconds=per_page_timeout, + # Use our custom client with larger timeouts + custom_client=custom_client, + ) + + # Parse the file asynchronously + result = await parser.aparse(file_path) + return result + + except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: + last_exception = e + error_type = type(e).__name__ + + if attempt < LLAMACLOUD_MAX_RETRIES: + # Calculate exponential backoff delay + delay = LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)) + + if task_logger and log_entry: + await task_logger.log_task_progress( + log_entry, + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay}s", + { + "error_type": error_type, + "error_message": str(e)[:200], + "attempt": attempt, + "retry_delay": delay, + }, + ) + else: + logging.warning( + f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): {error_type}. " + f"Retrying in {delay}s..." + ) + + await asyncio.sleep(delay) + else: + logging.error( + f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts: {error_type} - {e}" + ) + + except Exception: + # Non-retryable exception, raise immediately + raise + + # All retries exhausted + raise last_exception or RuntimeError("LlamaCloud parsing failed after all retries") + + async def add_received_file_document_using_unstructured( session: AsyncSession, file_name: str, @@ -819,24 +939,18 @@ async def process_file_in_background( "file_type": "document", "etl_service": "LLAMACLOUD", "processing_stage": "parsing", + "estimated_pages": estimated_pages_before, }, ) - from llama_cloud_services import LlamaParse - from llama_cloud_services.parse.utils import ResultType - - # Create LlamaParse parser instance - parser = LlamaParse( - api_key=app_config.LLAMA_CLOUD_API_KEY, - num_workers=1, # Use single worker for file processing - verbose=True, - language="en", - result_type=ResultType.MD, + # Parse file with retry logic for SSL/connection errors (common with large files) + result = await parse_with_llamacloud_retry( + file_path=file_path, + estimated_pages=estimated_pages_before, + task_logger=task_logger, + log_entry=log_entry, ) - # Parse the file asynchronously - result = await parser.aparse(file_path) - # Clean up the temp file import os