refactor: consolidate document processing logic and remove unused files and ETL strategies

This commit is contained in:
Anish Sarkar 2026-04-05 17:29:24 +05:30
parent f40de6b695
commit 1248363ca9
5 changed files with 3 additions and 416 deletions

View file

@ -1,41 +1,17 @@
"""
Document processors module for background tasks.
This module provides a collection of document processors for different content types
and sources. Each processor is responsible for handling a specific type of document
processing task in the background.
Available processors:
- Extension processor: Handle documents from browser extension
- Markdown processor: Process markdown files
- File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling)
- YouTube processor: Process YouTube videos and extract transcripts
Content extraction is handled by ``app.etl_pipeline.EtlPipelineService``.
This package keeps orchestration (save, notify, page-limit) and
non-ETL processors (extension, markdown, youtube).
"""
# Extension processor
# File processors (backward-compatible re-exports from _save)
from ._save import (
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
)
from .extension_processor import add_extension_received_document
# Markdown processor
from .markdown_processor import add_received_markdown_file_document
# YouTube processor
from .youtube_processor import add_youtube_video_document
__all__ = [
# Extension processing
"add_extension_received_document",
# File processing with different ETL services
"add_received_file_document_using_docling",
"add_received_file_document_using_llamacloud",
"add_received_file_document_using_unstructured",
# Markdown file processing
"add_received_markdown_file_document",
# YouTube video processing
"add_youtube_video_document",
]

View file

@ -1,74 +0,0 @@
"""
Constants for file document processing.
Centralizes file type classification, LlamaCloud retry configuration,
and timeout calculation parameters.
"""
import ssl
from enum import Enum
import httpx
# ---------------------------------------------------------------------------
# File type classification
# ---------------------------------------------------------------------------
MARKDOWN_EXTENSIONS = (".md", ".markdown", ".txt")
AUDIO_EXTENSIONS = (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")
DIRECT_CONVERT_EXTENSIONS = (".csv", ".tsv", ".html", ".htm")
class FileCategory(Enum):
MARKDOWN = "markdown"
AUDIO = "audio"
DIRECT_CONVERT = "direct_convert"
DOCUMENT = "document"
def classify_file(filename: str) -> FileCategory:
"""Classify a file by its extension into a processing category."""
lower = filename.lower()
if lower.endswith(MARKDOWN_EXTENSIONS):
return FileCategory.MARKDOWN
if lower.endswith(AUDIO_EXTENSIONS):
return FileCategory.AUDIO
if lower.endswith(DIRECT_CONVERT_EXTENSIONS):
return FileCategory.DIRECT_CONVERT
return FileCategory.DOCUMENT
# ---------------------------------------------------------------------------
# LlamaCloud retry configuration
# ---------------------------------------------------------------------------
LLAMACLOUD_MAX_RETRIES = 5
LLAMACLOUD_BASE_DELAY = 10 # seconds (exponential backoff base)
LLAMACLOUD_MAX_DELAY = 120 # max delay between retries (2 minutes)
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.ReadTimeout,
httpx.WriteError,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError,
)
# ---------------------------------------------------------------------------
# Timeout calculation constants
# ---------------------------------------------------------------------------
UPLOAD_BYTES_PER_SECOND_SLOW = (
100 * 1024
) # 100 KB/s (conservative for slow connections)
MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file
MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files
BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing
PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing

View file

@ -1,209 +0,0 @@
"""
ETL parsing strategies for different document processing services.
Provides parse functions for Unstructured, LlamaCloud, and Docling, along with
LlamaCloud retry logic and dynamic timeout calculations.
"""
import asyncio
import logging
import os
import random
import warnings
from logging import ERROR, getLogger
import httpx
from app.config import config as app_config
from app.db import Log
from app.services.task_logging_service import TaskLoggingService
from ._constants import (
LLAMACLOUD_BASE_DELAY,
LLAMACLOUD_MAX_DELAY,
LLAMACLOUD_MAX_RETRIES,
LLAMACLOUD_RETRYABLE_EXCEPTIONS,
PER_PAGE_JOB_TIMEOUT,
)
from ._helpers import calculate_job_timeout, calculate_upload_timeout
# ---------------------------------------------------------------------------
# LlamaCloud parsing with retry
# ---------------------------------------------------------------------------
async def parse_with_llamacloud_retry(
file_path: str,
estimated_pages: int,
task_logger: TaskLoggingService | None = None,
log_entry: Log | None = None,
):
"""
Parse a file with LlamaCloud with retry logic for transient SSL/connection errors.
Uses dynamic timeout calculations based on file size and page count to handle
very large files reliably.
Returns:
LlamaParse result object
Raises:
Exception: If all retries fail
"""
from llama_cloud_services import LlamaParse
from llama_cloud_services.parse.utils import ResultType
file_size_bytes = os.path.getsize(file_path)
file_size_mb = file_size_bytes / (1024 * 1024)
upload_timeout = calculate_upload_timeout(file_size_bytes)
job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes)
custom_timeout = httpx.Timeout(
connect=120.0,
read=upload_timeout,
write=upload_timeout,
pool=120.0,
)
logging.info(
f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, "
f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, "
f"job_timeout={job_timeout:.0f}s"
)
last_exception = None
attempt_errors: list[str] = []
for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1):
try:
async with httpx.AsyncClient(timeout=custom_timeout) as custom_client:
parser = LlamaParse(
api_key=app_config.LLAMA_CLOUD_API_KEY,
num_workers=1,
verbose=True,
language="en",
result_type=ResultType.MD,
max_timeout=int(max(2000, job_timeout + upload_timeout)),
job_timeout_in_seconds=job_timeout,
job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT,
custom_client=custom_client,
)
result = await parser.aparse(file_path)
if attempt > 1:
logging.info(
f"LlamaCloud upload succeeded on attempt {attempt} after "
f"{len(attempt_errors)} failures"
)
return result
except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < LLAMACLOUD_MAX_RETRIES:
base_delay = min(
LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)),
LLAMACLOUD_MAX_DELAY,
)
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
if task_logger and log_entry:
await task_logger.log_task_progress(
log_entry,
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), "
f"retrying in {delay:.0f}s",
{
"error_type": error_type,
"error_message": error_msg,
"attempt": attempt,
"retry_delay": delay,
"file_size_mb": round(file_size_mb, 1),
"upload_timeout": upload_timeout,
},
)
else:
logging.warning(
f"LlamaCloud upload failed "
f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): "
f"{error_type}. File: {file_size_mb:.1f}MB. "
f"Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} "
f"attempts. File size: {file_size_mb:.1f}MB, "
f"Pages: {estimated_pages}. "
f"Errors: {'; '.join(attempt_errors)}"
)
except Exception:
raise
raise last_exception or RuntimeError(
f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)
# ---------------------------------------------------------------------------
# Per-service parse functions
# ---------------------------------------------------------------------------
async def parse_with_unstructured(file_path: str):
"""
Parse a file using the Unstructured ETL service.
Returns:
List of LangChain Document elements.
"""
from langchain_unstructured import UnstructuredLoader
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
return await loader.aload()
async def parse_with_docling(file_path: str, filename: str) -> str:
"""
Parse a file using the Docling ETL service (via the Docling service wrapper).
Returns:
Markdown content string.
"""
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
pdfminer_logger = getLogger("pdfminer")
original_level = pdfminer_logger.level
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer")
warnings.filterwarnings(
"ignore", message=".*Cannot set gray non-stroke color.*"
)
warnings.filterwarnings("ignore", message=".*invalid float value.*")
pdfminer_logger.setLevel(ERROR)
try:
result = await docling_service.process_document(file_path, filename)
finally:
pdfminer_logger.setLevel(original_level)
return result["content"]

View file

@ -11,13 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.utils.document_converters import generate_unique_identifier_hash
from ._constants import (
BASE_JOB_TIMEOUT,
MAX_UPLOAD_TIMEOUT,
MIN_UPLOAD_TIMEOUT,
PER_PAGE_JOB_TIMEOUT,
UPLOAD_BYTES_PER_SECOND_SLOW,
)
from .base import (
check_document_by_unique_identifier,
check_duplicate_document,
@ -198,21 +191,3 @@ async def update_document_from_connector(
if "connector_id" in connector:
document.connector_id = connector["connector_id"]
await session.commit()
# ---------------------------------------------------------------------------
# Timeout calculations
# ---------------------------------------------------------------------------
def calculate_upload_timeout(file_size_bytes: int) -> float:
"""Calculate upload timeout based on file size (conservative for slow connections)."""
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
"""Calculate job processing timeout based on page count and file size."""
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
return max(page_based_timeout, size_based_timeout)

View file

@ -1,14 +1,9 @@
"""
Unified document save/update logic for file processors.
Replaces the three nearly-identical ``add_received_file_document_using_*``
functions with a single ``save_file_document`` function plus thin wrappers
for backward compatibility.
"""
import logging
from langchain_core.documents import Document as LangChainDocument
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
@ -207,79 +202,3 @@ async def save_file_document(
raise RuntimeError(
f"Failed to process file document using {etl_service}: {e!s}"
) from e
# ---------------------------------------------------------------------------
# Backward-compatible wrapper functions
# ---------------------------------------------------------------------------
async def add_received_file_document_using_unstructured(
session: AsyncSession,
file_name: str,
unstructured_processed_elements: list[LangChainDocument],
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store a file document using the Unstructured service."""
from app.utils.document_converters import convert_document_to_markdown
markdown_content = await convert_document_to_markdown(
unstructured_processed_elements
)
return await save_file_document(
session,
file_name,
markdown_content,
search_space_id,
user_id,
"UNSTRUCTURED",
connector,
enable_summary,
)
async def add_received_file_document_using_llamacloud(
session: AsyncSession,
file_name: str,
llamacloud_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by LlamaCloud."""
return await save_file_document(
session,
file_name,
llamacloud_markdown_document,
search_space_id,
user_id,
"LLAMACLOUD",
connector,
enable_summary,
)
async def add_received_file_document_using_docling(
session: AsyncSession,
file_name: str,
docling_markdown_document: str,
search_space_id: int,
user_id: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""Process and store document content parsed by Docling."""
return await save_file_document(
session,
file_name,
docling_markdown_document,
search_space_id,
user_id,
"DOCLING",
connector,
enable_summary,
)