From f40de6b6954c1ca286a022eebff7e994213d6f26 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:27:24 +0530 Subject: [PATCH] feat: add parsers for Docling, LlamaCloud, and Unstructured to ETL pipeline --- .../app/etl_pipeline/parsers/docling.py | 26 ++++ .../app/etl_pipeline/parsers/llamacloud.py | 129 ++++++++++++++++++ .../app/etl_pipeline/parsers/unstructured.py | 14 ++ 3 files changed, 169 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/parsers/docling.py create mode 100644 surfsense_backend/app/etl_pipeline/parsers/llamacloud.py create mode 100644 surfsense_backend/app/etl_pipeline/parsers/unstructured.py diff --git a/surfsense_backend/app/etl_pipeline/parsers/docling.py b/surfsense_backend/app/etl_pipeline/parsers/docling.py new file mode 100644 index 000000000..df0498148 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/docling.py @@ -0,0 +1,26 @@ +import warnings +from logging import ERROR, getLogger + + +async def parse_with_docling(file_path: str, filename: str) -> str: + from app.services.docling_service import create_docling_service + + docling_service = create_docling_service() + + pdfminer_logger = getLogger("pdfminer") + original_level = pdfminer_logger.level + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") + warnings.filterwarnings( + "ignore", message=".*Cannot set gray non-stroke color.*" + ) + warnings.filterwarnings("ignore", message=".*invalid float value.*") + pdfminer_logger.setLevel(ERROR) + + try: + result = await docling_service.process_document(file_path, filename) + finally: + pdfminer_logger.setLevel(original_level) + + return result["content"] diff --git a/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py b/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py new file mode 100644 index 000000000..5115aebea --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py @@ -0,0 +1,129 @@ +import asyncio +import logging +import os +import random + +import httpx + +from app.config import config as app_config +from app.etl_pipeline.constants import ( + LLAMACLOUD_BASE_DELAY, + LLAMACLOUD_MAX_DELAY, + LLAMACLOUD_MAX_RETRIES, + LLAMACLOUD_RETRYABLE_EXCEPTIONS, + PER_PAGE_JOB_TIMEOUT, + calculate_job_timeout, + calculate_upload_timeout, +) + + +async def parse_with_llamacloud(file_path: str, estimated_pages: int) -> str: + from llama_cloud_services import LlamaParse + from llama_cloud_services.parse.utils import ResultType + + file_size_bytes = os.path.getsize(file_path) + file_size_mb = file_size_bytes / (1024 * 1024) + + upload_timeout = calculate_upload_timeout(file_size_bytes) + job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) + + custom_timeout = httpx.Timeout( + connect=120.0, + read=upload_timeout, + write=upload_timeout, + pool=120.0, + ) + + logging.info( + f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " + f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " + f"job_timeout={job_timeout:.0f}s" + ) + + last_exception = None + attempt_errors: list[str] = [] + + for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): + try: + async with httpx.AsyncClient(timeout=custom_timeout) as custom_client: + parser = LlamaParse( + api_key=app_config.LLAMA_CLOUD_API_KEY, + num_workers=1, + verbose=True, + language="en", + result_type=ResultType.MD, + max_timeout=int(max(2000, job_timeout + upload_timeout)), + job_timeout_in_seconds=job_timeout, + job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, + custom_client=custom_client, + ) + result = await parser.aparse(file_path) + + if attempt > 1: + logging.info( + f"LlamaCloud upload succeeded on attempt {attempt} after " + f"{len(attempt_errors)} failures" + ) + + if hasattr(result, "get_markdown_documents"): + markdown_docs = result.get_markdown_documents( + split_by_page=False + ) + if markdown_docs and hasattr(markdown_docs[0], "text"): + return markdown_docs[0].text + if hasattr(result, "pages") and result.pages: + return "\n\n".join( + p.md + for p in result.pages + if hasattr(p, "md") and p.md + ) + return str(result) + + if isinstance(result, list): + if result and hasattr(result[0], "text"): + return result[0].text + return "\n\n".join( + doc.page_content + if hasattr(doc, "page_content") + else str(doc) + for doc in result + ) + + return str(result) + + except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: + last_exception = e + error_type = type(e).__name__ + error_msg = str(e)[:200] + attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") + + if attempt < LLAMACLOUD_MAX_RETRIES: + base_delay = min( + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), + LLAMACLOUD_MAX_DELAY, + ) + jitter = base_delay * 0.25 * (2 * random.random() - 1) + delay = base_delay + jitter + + logging.warning( + f"LlamaCloud upload failed " + f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " + f"{error_type}. File: {file_size_mb:.1f}MB. " + f"Retrying in {delay:.0f}s..." + ) + await asyncio.sleep(delay) + else: + logging.error( + f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} " + f"attempts. File size: {file_size_mb:.1f}MB, " + f"Pages: {estimated_pages}. " + f"Errors: {'; '.join(attempt_errors)}" + ) + + except Exception: + raise + + raise last_exception or RuntimeError( + f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " + f"File size: {file_size_mb:.1f}MB" + ) diff --git a/surfsense_backend/app/etl_pipeline/parsers/unstructured.py b/surfsense_backend/app/etl_pipeline/parsers/unstructured.py new file mode 100644 index 000000000..af8fb99b6 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/unstructured.py @@ -0,0 +1,14 @@ +async def parse_with_unstructured(file_path: str) -> str: + from langchain_unstructured import UnstructuredLoader + + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + docs = await loader.aload() + return "\n\n".join(doc.page_content for doc in docs if doc.page_content)