diff --git a/surfsense_backend/app/tasks/document_processors/__init__.py b/surfsense_backend/app/tasks/document_processors/__init__.py index 2b5690d02..f82c10883 100644 --- a/surfsense_backend/app/tasks/document_processors/__init__.py +++ b/surfsense_backend/app/tasks/document_processors/__init__.py @@ -1,41 +1,17 @@ """ Document processors module for background tasks. -This module provides a collection of document processors for different content types -and sources. Each processor is responsible for handling a specific type of document -processing task in the background. - -Available processors: -- Extension processor: Handle documents from browser extension -- Markdown processor: Process markdown files -- File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling) -- YouTube processor: Process YouTube videos and extract transcripts +Content extraction is handled by ``app.etl_pipeline.EtlPipelineService``. +This package keeps orchestration (save, notify, page-limit) and +non-ETL processors (extension, markdown, youtube). """ -# Extension processor -# File processors (backward-compatible re-exports from _save) -from ._save import ( - add_received_file_document_using_docling, - add_received_file_document_using_llamacloud, - add_received_file_document_using_unstructured, -) from .extension_processor import add_extension_received_document - -# Markdown processor from .markdown_processor import add_received_markdown_file_document - -# YouTube processor from .youtube_processor import add_youtube_video_document __all__ = [ - # Extension processing "add_extension_received_document", - # File processing with different ETL services - "add_received_file_document_using_docling", - "add_received_file_document_using_llamacloud", - "add_received_file_document_using_unstructured", - # Markdown file processing "add_received_markdown_file_document", - # YouTube video processing "add_youtube_video_document", ] diff --git a/surfsense_backend/app/tasks/document_processors/_constants.py b/surfsense_backend/app/tasks/document_processors/_constants.py deleted file mode 100644 index f74d7acce..000000000 --- a/surfsense_backend/app/tasks/document_processors/_constants.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Constants for file document processing. - -Centralizes file type classification, LlamaCloud retry configuration, -and timeout calculation parameters. -""" - -import ssl -from enum import Enum - -import httpx - -# --------------------------------------------------------------------------- -# File type classification -# --------------------------------------------------------------------------- - -MARKDOWN_EXTENSIONS = (".md", ".markdown", ".txt") -AUDIO_EXTENSIONS = (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") -DIRECT_CONVERT_EXTENSIONS = (".csv", ".tsv", ".html", ".htm") - - -class FileCategory(Enum): - MARKDOWN = "markdown" - AUDIO = "audio" - DIRECT_CONVERT = "direct_convert" - DOCUMENT = "document" - - -def classify_file(filename: str) -> FileCategory: - """Classify a file by its extension into a processing category.""" - lower = filename.lower() - if lower.endswith(MARKDOWN_EXTENSIONS): - return FileCategory.MARKDOWN - if lower.endswith(AUDIO_EXTENSIONS): - return FileCategory.AUDIO - if lower.endswith(DIRECT_CONVERT_EXTENSIONS): - return FileCategory.DIRECT_CONVERT - return FileCategory.DOCUMENT - - -# --------------------------------------------------------------------------- -# LlamaCloud retry configuration -# --------------------------------------------------------------------------- - -LLAMACLOUD_MAX_RETRIES = 5 -LLAMACLOUD_BASE_DELAY = 10 # seconds (exponential backoff base) -LLAMACLOUD_MAX_DELAY = 120 # max delay between retries (2 minutes) -LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( - ssl.SSLError, - httpx.ConnectError, - httpx.ConnectTimeout, - httpx.ReadError, - httpx.ReadTimeout, - httpx.WriteError, - httpx.WriteTimeout, - httpx.RemoteProtocolError, - httpx.LocalProtocolError, - ConnectionError, - ConnectionResetError, - TimeoutError, - OSError, -) - -# --------------------------------------------------------------------------- -# Timeout calculation constants -# --------------------------------------------------------------------------- - -UPLOAD_BYTES_PER_SECOND_SLOW = ( - 100 * 1024 -) # 100 KB/s (conservative for slow connections) -MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file -MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files -BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing -PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing diff --git a/surfsense_backend/app/tasks/document_processors/_etl.py b/surfsense_backend/app/tasks/document_processors/_etl.py deleted file mode 100644 index cc3a8b1ac..000000000 --- a/surfsense_backend/app/tasks/document_processors/_etl.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -ETL parsing strategies for different document processing services. - -Provides parse functions for Unstructured, LlamaCloud, and Docling, along with -LlamaCloud retry logic and dynamic timeout calculations. -""" - -import asyncio -import logging -import os -import random -import warnings -from logging import ERROR, getLogger - -import httpx - -from app.config import config as app_config -from app.db import Log -from app.services.task_logging_service import TaskLoggingService - -from ._constants import ( - LLAMACLOUD_BASE_DELAY, - LLAMACLOUD_MAX_DELAY, - LLAMACLOUD_MAX_RETRIES, - LLAMACLOUD_RETRYABLE_EXCEPTIONS, - PER_PAGE_JOB_TIMEOUT, -) -from ._helpers import calculate_job_timeout, calculate_upload_timeout - -# --------------------------------------------------------------------------- -# LlamaCloud parsing with retry -# --------------------------------------------------------------------------- - - -async def parse_with_llamacloud_retry( - file_path: str, - estimated_pages: int, - task_logger: TaskLoggingService | None = None, - log_entry: Log | None = None, -): - """ - Parse a file with LlamaCloud with retry logic for transient SSL/connection errors. - - Uses dynamic timeout calculations based on file size and page count to handle - very large files reliably. - - Returns: - LlamaParse result object - - Raises: - Exception: If all retries fail - """ - from llama_cloud_services import LlamaParse - from llama_cloud_services.parse.utils import ResultType - - file_size_bytes = os.path.getsize(file_path) - file_size_mb = file_size_bytes / (1024 * 1024) - - upload_timeout = calculate_upload_timeout(file_size_bytes) - job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) - - custom_timeout = httpx.Timeout( - connect=120.0, - read=upload_timeout, - write=upload_timeout, - pool=120.0, - ) - - logging.info( - f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " - f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " - f"job_timeout={job_timeout:.0f}s" - ) - - last_exception = None - attempt_errors: list[str] = [] - - for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): - try: - async with httpx.AsyncClient(timeout=custom_timeout) as custom_client: - parser = LlamaParse( - api_key=app_config.LLAMA_CLOUD_API_KEY, - num_workers=1, - verbose=True, - language="en", - result_type=ResultType.MD, - max_timeout=int(max(2000, job_timeout + upload_timeout)), - job_timeout_in_seconds=job_timeout, - job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, - custom_client=custom_client, - ) - result = await parser.aparse(file_path) - - if attempt > 1: - logging.info( - f"LlamaCloud upload succeeded on attempt {attempt} after " - f"{len(attempt_errors)} failures" - ) - return result - - except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: - last_exception = e - error_type = type(e).__name__ - error_msg = str(e)[:200] - attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") - - if attempt < LLAMACLOUD_MAX_RETRIES: - base_delay = min( - LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), - LLAMACLOUD_MAX_DELAY, - ) - jitter = base_delay * 0.25 * (2 * random.random() - 1) - delay = base_delay + jitter - - if task_logger and log_entry: - await task_logger.log_task_progress( - log_entry, - f"LlamaCloud upload failed " - f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), " - f"retrying in {delay:.0f}s", - { - "error_type": error_type, - "error_message": error_msg, - "attempt": attempt, - "retry_delay": delay, - "file_size_mb": round(file_size_mb, 1), - "upload_timeout": upload_timeout, - }, - ) - else: - logging.warning( - f"LlamaCloud upload failed " - f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " - f"{error_type}. File: {file_size_mb:.1f}MB. " - f"Retrying in {delay:.0f}s..." - ) - - await asyncio.sleep(delay) - else: - logging.error( - f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} " - f"attempts. File size: {file_size_mb:.1f}MB, " - f"Pages: {estimated_pages}. " - f"Errors: {'; '.join(attempt_errors)}" - ) - - except Exception: - raise - - raise last_exception or RuntimeError( - f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " - f"File size: {file_size_mb:.1f}MB" - ) - - -# --------------------------------------------------------------------------- -# Per-service parse functions -# --------------------------------------------------------------------------- - - -async def parse_with_unstructured(file_path: str): - """ - Parse a file using the Unstructured ETL service. - - Returns: - List of LangChain Document elements. - """ - from langchain_unstructured import UnstructuredLoader - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - return await loader.aload() - - -async def parse_with_docling(file_path: str, filename: str) -> str: - """ - Parse a file using the Docling ETL service (via the Docling service wrapper). - - Returns: - Markdown content string. - """ - from app.services.docling_service import create_docling_service - - docling_service = create_docling_service() - - pdfminer_logger = getLogger("pdfminer") - original_level = pdfminer_logger.level - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") - warnings.filterwarnings( - "ignore", message=".*Cannot set gray non-stroke color.*" - ) - warnings.filterwarnings("ignore", message=".*invalid float value.*") - pdfminer_logger.setLevel(ERROR) - - try: - result = await docling_service.process_document(file_path, filename) - finally: - pdfminer_logger.setLevel(original_level) - - return result["content"] diff --git a/surfsense_backend/app/tasks/document_processors/_helpers.py b/surfsense_backend/app/tasks/document_processors/_helpers.py index 7ac05932c..9cd7b87c9 100644 --- a/surfsense_backend/app/tasks/document_processors/_helpers.py +++ b/surfsense_backend/app/tasks/document_processors/_helpers.py @@ -11,13 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentStatus, DocumentType from app.utils.document_converters import generate_unique_identifier_hash -from ._constants import ( - BASE_JOB_TIMEOUT, - MAX_UPLOAD_TIMEOUT, - MIN_UPLOAD_TIMEOUT, - PER_PAGE_JOB_TIMEOUT, - UPLOAD_BYTES_PER_SECOND_SLOW, -) from .base import ( check_document_by_unique_identifier, check_duplicate_document, @@ -198,21 +191,3 @@ async def update_document_from_connector( if "connector_id" in connector: document.connector_id = connector["connector_id"] await session.commit() - - -# --------------------------------------------------------------------------- -# Timeout calculations -# --------------------------------------------------------------------------- - - -def calculate_upload_timeout(file_size_bytes: int) -> float: - """Calculate upload timeout based on file size (conservative for slow connections).""" - estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 - return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) - - -def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: - """Calculate job processing timeout based on page count and file size.""" - page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) - size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 - return max(page_based_timeout, size_based_timeout) diff --git a/surfsense_backend/app/tasks/document_processors/_save.py b/surfsense_backend/app/tasks/document_processors/_save.py index 5088ad004..ae45f7a69 100644 --- a/surfsense_backend/app/tasks/document_processors/_save.py +++ b/surfsense_backend/app/tasks/document_processors/_save.py @@ -1,14 +1,9 @@ """ Unified document save/update logic for file processors. - -Replaces the three nearly-identical ``add_received_file_document_using_*`` -functions with a single ``save_file_document`` function plus thin wrappers -for backward compatibility. """ import logging -from langchain_core.documents import Document as LangChainDocument from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -207,79 +202,3 @@ async def save_file_document( raise RuntimeError( f"Failed to process file document using {etl_service}: {e!s}" ) from e - - -# --------------------------------------------------------------------------- -# Backward-compatible wrapper functions -# --------------------------------------------------------------------------- - - -async def add_received_file_document_using_unstructured( - session: AsyncSession, - file_name: str, - unstructured_processed_elements: list[LangChainDocument], - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store a file document using the Unstructured service.""" - from app.utils.document_converters import convert_document_to_markdown - - markdown_content = await convert_document_to_markdown( - unstructured_processed_elements - ) - return await save_file_document( - session, - file_name, - markdown_content, - search_space_id, - user_id, - "UNSTRUCTURED", - connector, - enable_summary, - ) - - -async def add_received_file_document_using_llamacloud( - session: AsyncSession, - file_name: str, - llamacloud_markdown_document: str, - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store document content parsed by LlamaCloud.""" - return await save_file_document( - session, - file_name, - llamacloud_markdown_document, - search_space_id, - user_id, - "LLAMACLOUD", - connector, - enable_summary, - ) - - -async def add_received_file_document_using_docling( - session: AsyncSession, - file_name: str, - docling_markdown_document: str, - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store document content parsed by Docling.""" - return await save_file_document( - session, - file_name, - docling_markdown_document, - search_space_id, - user_id, - "DOCLING", - connector, - enable_summary, - )