From 5d22349dc102e3e87b42e20a923ad79df1ecae51 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:25:25 +0530 Subject: [PATCH] feat: implement ETL pipeline with file classification and extraction services --- .../app/etl_pipeline/__init__.py | 0 .../app/etl_pipeline/constants.py | 39 ++++++++++ .../app/etl_pipeline/etl_document.py | 21 ++++++ .../app/etl_pipeline/etl_pipeline_service.py | 73 +++++++++++++++++++ .../app/etl_pipeline/exceptions.py | 6 ++ .../app/etl_pipeline/file_classifier.py | 49 +++++++++++++ 6 files changed, 188 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/__init__.py create mode 100644 surfsense_backend/app/etl_pipeline/constants.py create mode 100644 surfsense_backend/app/etl_pipeline/etl_document.py create mode 100644 surfsense_backend/app/etl_pipeline/etl_pipeline_service.py create mode 100644 surfsense_backend/app/etl_pipeline/exceptions.py create mode 100644 surfsense_backend/app/etl_pipeline/file_classifier.py diff --git a/surfsense_backend/app/etl_pipeline/__init__.py b/surfsense_backend/app/etl_pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/etl_pipeline/constants.py b/surfsense_backend/app/etl_pipeline/constants.py new file mode 100644 index 000000000..f65759c13 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/constants.py @@ -0,0 +1,39 @@ +import ssl + +import httpx + +LLAMACLOUD_MAX_RETRIES = 5 +LLAMACLOUD_BASE_DELAY = 10 +LLAMACLOUD_MAX_DELAY = 120 +LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( + ssl.SSLError, + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadError, + httpx.ReadTimeout, + httpx.WriteError, + httpx.WriteTimeout, + httpx.RemoteProtocolError, + httpx.LocalProtocolError, + ConnectionError, + ConnectionResetError, + TimeoutError, + OSError, +) + +UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 +MIN_UPLOAD_TIMEOUT = 120 +MAX_UPLOAD_TIMEOUT = 1800 +BASE_JOB_TIMEOUT = 600 +PER_PAGE_JOB_TIMEOUT = 60 + + +def calculate_upload_timeout(file_size_bytes: int) -> float: + estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 + return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) + + +def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: + page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) + size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 + return max(page_based_timeout, size_based_timeout) diff --git a/surfsense_backend/app/etl_pipeline/etl_document.py b/surfsense_backend/app/etl_pipeline/etl_document.py new file mode 100644 index 000000000..350c3299f --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/etl_document.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel, field_validator + + +class EtlRequest(BaseModel): + file_path: str + filename: str + estimated_pages: int = 0 + + @field_validator("filename") + @classmethod + def filename_must_not_be_empty(cls, v: str) -> str: + if not v.strip(): + raise ValueError("filename must not be empty") + return v + + +class EtlResult(BaseModel): + markdown_content: str + etl_service: str + actual_pages: int = 0 + content_type: str diff --git a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py new file mode 100644 index 000000000..f382451df --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py @@ -0,0 +1,73 @@ +from app.config import config as app_config +from app.etl_pipeline.etl_document import EtlRequest, EtlResult +from app.etl_pipeline.exceptions import EtlServiceUnavailableError +from app.etl_pipeline.file_classifier import FileCategory, classify_file +from app.etl_pipeline.parsers.audio import transcribe_audio +from app.etl_pipeline.parsers.direct_convert import convert_file_directly +from app.etl_pipeline.parsers.plaintext import read_plaintext + + +class EtlPipelineService: + """Single pipeline for extracting markdown from files. All callers use this.""" + + async def extract(self, request: EtlRequest) -> EtlResult: + category = classify_file(request.filename) + + if category == FileCategory.PLAINTEXT: + content = read_plaintext(request.file_path) + return EtlResult( + markdown_content=content, + etl_service="PLAINTEXT", + content_type="plaintext", + ) + + if category == FileCategory.DIRECT_CONVERT: + content = convert_file_directly(request.file_path, request.filename) + return EtlResult( + markdown_content=content, + etl_service="DIRECT_CONVERT", + content_type="direct_convert", + ) + + if category == FileCategory.AUDIO: + content = await transcribe_audio(request.file_path, request.filename) + return EtlResult( + markdown_content=content, + etl_service="AUDIO", + content_type="audio", + ) + + return await self._extract_document(request) + + async def _extract_document(self, request: EtlRequest) -> EtlResult: + etl_service = app_config.ETL_SERVICE + if not etl_service: + raise EtlServiceUnavailableError( + "No ETL_SERVICE configured. " + "Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env" + ) + + if etl_service == "DOCLING": + from app.etl_pipeline.parsers.docling import parse_with_docling + + content = await parse_with_docling(request.file_path, request.filename) + elif etl_service == "UNSTRUCTURED": + from app.etl_pipeline.parsers.unstructured import parse_with_unstructured + + content = await parse_with_unstructured(request.file_path) + elif etl_service == "LLAMACLOUD": + from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud + + content = await parse_with_llamacloud( + request.file_path, request.estimated_pages + ) + else: + raise EtlServiceUnavailableError( + f"Unknown ETL_SERVICE: {etl_service}" + ) + + return EtlResult( + markdown_content=content, + etl_service=etl_service, + content_type="document", + ) diff --git a/surfsense_backend/app/etl_pipeline/exceptions.py b/surfsense_backend/app/etl_pipeline/exceptions.py new file mode 100644 index 000000000..ac8fc0172 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/exceptions.py @@ -0,0 +1,6 @@ +class EtlParseError(Exception): + """Raised when an ETL parser fails to produce content.""" + + +class EtlServiceUnavailableError(Exception): + """Raised when the configured ETL_SERVICE is not recognised.""" diff --git a/surfsense_backend/app/etl_pipeline/file_classifier.py b/surfsense_backend/app/etl_pipeline/file_classifier.py new file mode 100644 index 000000000..40c2d5aff --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/file_classifier.py @@ -0,0 +1,49 @@ +from enum import Enum +from pathlib import PurePosixPath + + +PLAINTEXT_EXTENSIONS = frozenset( + { + ".md", ".markdown", ".txt", ".text", + ".json", ".jsonl", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", ".xml", + ".css", ".scss", ".less", ".sass", + ".py", ".pyw", ".pyi", ".pyx", + ".js", ".jsx", ".ts", ".tsx", ".mjs", ".cjs", + ".java", ".kt", ".kts", ".scala", ".groovy", + ".c", ".h", ".cpp", ".cxx", ".cc", ".hpp", ".hxx", + ".cs", ".fs", ".fsx", + ".go", ".rs", ".rb", ".php", ".pl", ".pm", ".lua", ".swift", + ".m", ".mm", ".r", ".jl", + ".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1", + ".sql", ".graphql", ".gql", + ".env", ".gitignore", ".dockerignore", ".editorconfig", + ".makefile", ".cmake", + ".log", ".rst", ".tex", ".bib", ".org", ".adoc", ".asciidoc", + ".vue", ".svelte", ".astro", + ".tf", ".hcl", ".proto", + } +) + +AUDIO_EXTENSIONS = frozenset( + {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"} +) + +DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"}) + + +class FileCategory(Enum): + PLAINTEXT = "plaintext" + AUDIO = "audio" + DIRECT_CONVERT = "direct_convert" + DOCUMENT = "document" + + +def classify_file(filename: str) -> FileCategory: + suffix = PurePosixPath(filename).suffix.lower() + if suffix in PLAINTEXT_EXTENSIONS: + return FileCategory.PLAINTEXT + if suffix in AUDIO_EXTENSIONS: + return FileCategory.AUDIO + if suffix in DIRECT_CONVERT_EXTENSIONS: + return FileCategory.DIRECT_CONVERT + return FileCategory.DOCUMENT