Merge remote-tracking branch 'upstream/dev' into feat/replace-logs

This commit is contained in:
Anish Sarkar 2026-01-15 15:52:47 +05:30
commit ab63b23f0a
24 changed files with 343 additions and 134 deletions

View file

@ -2,11 +2,14 @@
File document processors for different ETL services (Unstructured, LlamaCloud, Docling).
"""
import asyncio
import contextlib
import logging
import ssl
import warnings
from logging import ERROR, getLogger
import httpx
from fastapi import HTTPException
from langchain_core.documents import Document as LangChainDocument
from litellm import atranscription
@ -32,6 +35,122 @@ from .base import (
)
from .markdown_processor import add_received_markdown_file_document
# Constants for LlamaCloud retry configuration
LLAMACLOUD_MAX_RETRIES = 3
LLAMACLOUD_BASE_DELAY = 5 # Base delay in seconds for exponential backoff
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadTimeout,
httpx.WriteTimeout,
ConnectionError,
TimeoutError,
)
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
task_logger: TaskLoggingService | None = None,
log_entry: Log | None = None,
):
"""
Parse a file with LlamaCloud with retry logic for transient SSL/connection errors.
Args:
file_path: Path to the file to parse
estimated_pages: Estimated number of pages for timeout calculation
task_logger: Optional task logger for progress updates
log_entry: Optional log entry for progress updates
Returns:
LlamaParse result object
Raises:
Exception: If all retries fail
"""
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Calculate timeouts based on estimated pages
# Base timeout of 300 seconds + 30 seconds per page for large documents
base_timeout = 300
per_page_timeout = 30
job_timeout = base_timeout + (estimated_pages * per_page_timeout)
# Create custom httpx client with larger timeouts for file uploads
# The SSL error often occurs during large file uploads, so we need generous timeouts
custom_timeout = httpx.Timeout(
connect=60.0, # 60 seconds to establish connection
read=300.0, # 5 minutes to read response
write=300.0, # 5 minutes to write/upload (important for large files)
pool=60.0, # 60 seconds to acquire connection from pool
)
last_exception = None
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
# Create a fresh httpx client for each attempt
async with httpx.AsyncClient(timeout=custom_timeout) as custom_client:
# Create LlamaParse parser instance with optimized settings
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1, # Use single worker for file processing
verbose=True,
language="en",
result_type=ResultType.MD,
# Timeout settings for large files
max_timeout=max(2000, job_timeout), # Overall max timeout
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=per_page_timeout,
# Use our custom client with larger timeouts
custom_client=custom_client,
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
if attempt < LLAMACLOUD_MAX_RETRIES:
# Calculate exponential backoff delay
delay = LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1))
if task_logger and log_entry:
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), retrying in {delay}s",
{
"error_type": error_type,
"error_message": str(e)[:200],
"attempt": attempt,
"retry_delay": delay,
},
)
else:
logging.warning(
f"LlamaCloud upload failed (attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): {error_type}. "
f"Retrying in {delay}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} attempts: {error_type} - {e}"
)
except Exception:
# Non-retryable exception, raise immediately
raise
# All retries exhausted
raise last_exception or RuntimeError("LlamaCloud parsing failed after all retries")
async def add_received_file_document_using_unstructured(
session: AsyncSession,
@ -890,24 +1009,18 @@ async def process_file_in_background(
"file_type": "document",
"etl_service": "LLAMACLOUD",
"processing_stage": "parsing",
"estimated_pages": estimated_pages_before,
},
)
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
# Create LlamaParse parser instance
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1, # Use single worker for file processing
verbose=True,
language="en",
result_type=ResultType.MD,
# Parse file with retry logic for SSL/connection errors (common with large files)
result = await parse_with_llamacloud_retry(
file_path=file_path,
estimated_pages=estimated_pages_before,
task_logger=task_logger,
log_entry=log_entry,
)
# Parse the file asynchronously
result = await parser.aparse(file_path)
# Clean up the temp file
import os