feat(backend): Enhance LlamaCloud upload resilience with dynamic timeout calculations and increased retry settings

This commit is contained in:
DESKTOP-RTLN3BA\$punk 2026-01-27 17:50:45 -08:00
parent 114ac59c0e
commit b598cbeac3
2 changed files with 119 additions and 26 deletions

View file

@ -37,18 +37,30 @@ from .base import (
from .markdown_processor import add_received_markdown_file_document
# Constants for LlamaCloud retry configuration
LLAMACLOUD_MAX_RETRIES = 3
LLAMACLOUD_BASE_DELAY = 5 # Base delay in seconds for exponential backoff
LLAMACLOUD_MAX_RETRIES = 5 # Increased from 3 for large file resilience
LLAMACLOUD_BASE_DELAY = 10 # Base delay in seconds for exponential backoff
LLAMACLOUD_MAX_DELAY = 120 # Maximum delay between retries (2 minutes)
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError, # Catches various network-level errors
)
# Timeout calculation constants
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing
def get_google_drive_unique_identifier(
connector: dict | None,
@ -204,6 +216,48 @@ async def find_existing_document_with_migration(
return existing_document
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""
Calculate appropriate upload timeout based on file size.
Assumes a conservative slow connection speed to handle worst-case scenarios.
Args:
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Calculate time needed at slow connection speed
# Add 50% buffer for network variability and SSL overhead
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
# Clamp to reasonable bounds
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""
Calculate job processing timeout based on page count and file size.
Args:
estimated_pages: Estimated number of pages
file_size_bytes: Size of the file in bytes
Returns:
Timeout in seconds
"""
# Base timeout + time per page
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
# Also consider file size (large images take longer to process)
# ~1 minute per 10MB of file size
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
# Use the larger of the two estimates
return max(page_based_timeout, size_based_timeout)
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
@ -213,6 +267,9 @@ async def parse_with_llamacloud_retry(
"""
Parse a file with LlamaCloud with retry logic for transient SSL/connection errors.
Uses dynamic timeout calculations based on file size and page count to handle
very large files reliably.
Args:
file_path: Path to the file to parse
estimated_pages: Estimated number of pages for timeout calculation
@ -225,25 +282,37 @@ async def parse_with_llamacloud_retry(
Raises:
Exception: If all retries fail
"""
import os
import random
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Calculate timeouts based on estimated pages
# Base timeout of 300 seconds + 30 seconds per page for large documents
base_timeout = 300
per_page_timeout = 30
job_timeout = base_timeout + (estimated_pages * per_page_timeout)
# Create custom httpx client with larger timeouts for file uploads
# The SSL error often occurs during large file uploads, so we need generous timeouts
# Get file size for timeout calculations
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
# Calculate dynamic timeouts based on file size and page count
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
# HTTP client timeouts - scaled based on file size
# Write timeout is critical for large file uploads
custom_timeout = httpx.Timeout(
connect=60.0, # 60 seconds to establish connection
read=300.0, # 5 minutes to read response
write=300.0, # 5 minutes to write/upload (important for large files)
pool=60.0, # 60 seconds to acquire connection from pool
connect=120.0, # 2 minutes to establish connection (handles slow DNS, etc.)
read=upload_timeout, # Dynamic based on file size
write=upload_timeout, # Dynamic based on file size (upload time)
pool=120.0, # 2 minutes to acquire connection from pool
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
f"job_timeout={job_timeout:.0f}s"
)
last_exception = None
attempt_errors = []
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
@ -257,46 +326,67 @@ async def parse_with_llamacloud_retry(
language="en",
result_type=ResultType.MD,
# Timeout settings for large files
max_timeout=max(2000, job_timeout), # Overall max timeout
max_timeout=int(max(2000, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=per_page_timeout,
job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT,
# Use our custom client with larger timeouts
custom_client=custom_client,
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Success - log if we had previous failures
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < LLAMACLOUD_MAX_RETRIES:
# Calculate exponential backoff delay
delay = LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1))
# Calculate exponential backoff with jitter
# Base delay doubles each attempt, capped at max delay
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY
)
# Add random jitter (±25%) to prevent thundering herd
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
if task_logger and log_entry:
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay}s",
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay:.0f}s",
{
"error_type": error_type,
"error_message": str(e)[:200],
"error_message": error_msg,
"attempt": attempt,
"retry_delay": delay,
"file_size_mb": round(file_size_mb, 1),
"upload_timeout": upload_timeout,
},
)
else:
logging.warning(
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): {error_type}. "
f"Retrying in {delay}s..."
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): "
f"{error_type}. File: {file_size_mb:.1f}MB. Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts: {error_type} - {e}"
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts. "
f"File size: {file_size_mb:.1f}MB, Pages: {estimated_pages}. "
f"Errors: {'; '.join(attempt_errors)}"
)
except Exception:
@ -304,7 +394,10 @@ async def parse_with_llamacloud_retry(
raise
# All retries exhausted
raise last_exception or RuntimeError("LlamaCloud parsing failed after all retries")
raise last_exception or RuntimeError(
f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)
async def add_received_file_document_using_unstructured(