From 5d22349dc102e3e87b42e20a923ad79df1ecae51 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:25:25 +0530 Subject: [PATCH 01/37] feat: implement ETL pipeline with file classification and extraction services --- .../app/etl_pipeline/__init__.py | 0 .../app/etl_pipeline/constants.py | 39 ++++++++++ .../app/etl_pipeline/etl_document.py | 21 ++++++ .../app/etl_pipeline/etl_pipeline_service.py | 73 +++++++++++++++++++ .../app/etl_pipeline/exceptions.py | 6 ++ .../app/etl_pipeline/file_classifier.py | 49 +++++++++++++ 6 files changed, 188 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/__init__.py create mode 100644 surfsense_backend/app/etl_pipeline/constants.py create mode 100644 surfsense_backend/app/etl_pipeline/etl_document.py create mode 100644 surfsense_backend/app/etl_pipeline/etl_pipeline_service.py create mode 100644 surfsense_backend/app/etl_pipeline/exceptions.py create mode 100644 surfsense_backend/app/etl_pipeline/file_classifier.py diff --git a/surfsense_backend/app/etl_pipeline/__init__.py b/surfsense_backend/app/etl_pipeline/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/etl_pipeline/constants.py b/surfsense_backend/app/etl_pipeline/constants.py new file mode 100644 index 000000000..f65759c13 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/constants.py @@ -0,0 +1,39 @@ +import ssl + +import httpx + +LLAMACLOUD_MAX_RETRIES = 5 +LLAMACLOUD_BASE_DELAY = 10 +LLAMACLOUD_MAX_DELAY = 120 +LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( + ssl.SSLError, + httpx.ConnectError, + httpx.ConnectTimeout, + httpx.ReadError, + httpx.ReadTimeout, + httpx.WriteError, + httpx.WriteTimeout, + httpx.RemoteProtocolError, + httpx.LocalProtocolError, + ConnectionError, + ConnectionResetError, + TimeoutError, + OSError, +) + +UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024 +MIN_UPLOAD_TIMEOUT = 120 +MAX_UPLOAD_TIMEOUT = 1800 +BASE_JOB_TIMEOUT = 600 +PER_PAGE_JOB_TIMEOUT = 60 + + +def calculate_upload_timeout(file_size_bytes: int) -> float: + estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 + return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) + + +def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: + page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) + size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 + return max(page_based_timeout, size_based_timeout) diff --git a/surfsense_backend/app/etl_pipeline/etl_document.py b/surfsense_backend/app/etl_pipeline/etl_document.py new file mode 100644 index 000000000..350c3299f --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/etl_document.py @@ -0,0 +1,21 @@ +from pydantic import BaseModel, field_validator + + +class EtlRequest(BaseModel): + file_path: str + filename: str + estimated_pages: int = 0 + + @field_validator("filename") + @classmethod + def filename_must_not_be_empty(cls, v: str) -> str: + if not v.strip(): + raise ValueError("filename must not be empty") + return v + + +class EtlResult(BaseModel): + markdown_content: str + etl_service: str + actual_pages: int = 0 + content_type: str diff --git a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py new file mode 100644 index 000000000..f382451df --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py @@ -0,0 +1,73 @@ +from app.config import config as app_config +from app.etl_pipeline.etl_document import EtlRequest, EtlResult +from app.etl_pipeline.exceptions import EtlServiceUnavailableError +from app.etl_pipeline.file_classifier import FileCategory, classify_file +from app.etl_pipeline.parsers.audio import transcribe_audio +from app.etl_pipeline.parsers.direct_convert import convert_file_directly +from app.etl_pipeline.parsers.plaintext import read_plaintext + + +class EtlPipelineService: + """Single pipeline for extracting markdown from files. All callers use this.""" + + async def extract(self, request: EtlRequest) -> EtlResult: + category = classify_file(request.filename) + + if category == FileCategory.PLAINTEXT: + content = read_plaintext(request.file_path) + return EtlResult( + markdown_content=content, + etl_service="PLAINTEXT", + content_type="plaintext", + ) + + if category == FileCategory.DIRECT_CONVERT: + content = convert_file_directly(request.file_path, request.filename) + return EtlResult( + markdown_content=content, + etl_service="DIRECT_CONVERT", + content_type="direct_convert", + ) + + if category == FileCategory.AUDIO: + content = await transcribe_audio(request.file_path, request.filename) + return EtlResult( + markdown_content=content, + etl_service="AUDIO", + content_type="audio", + ) + + return await self._extract_document(request) + + async def _extract_document(self, request: EtlRequest) -> EtlResult: + etl_service = app_config.ETL_SERVICE + if not etl_service: + raise EtlServiceUnavailableError( + "No ETL_SERVICE configured. " + "Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env" + ) + + if etl_service == "DOCLING": + from app.etl_pipeline.parsers.docling import parse_with_docling + + content = await parse_with_docling(request.file_path, request.filename) + elif etl_service == "UNSTRUCTURED": + from app.etl_pipeline.parsers.unstructured import parse_with_unstructured + + content = await parse_with_unstructured(request.file_path) + elif etl_service == "LLAMACLOUD": + from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud + + content = await parse_with_llamacloud( + request.file_path, request.estimated_pages + ) + else: + raise EtlServiceUnavailableError( + f"Unknown ETL_SERVICE: {etl_service}" + ) + + return EtlResult( + markdown_content=content, + etl_service=etl_service, + content_type="document", + ) diff --git a/surfsense_backend/app/etl_pipeline/exceptions.py b/surfsense_backend/app/etl_pipeline/exceptions.py new file mode 100644 index 000000000..ac8fc0172 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/exceptions.py @@ -0,0 +1,6 @@ +class EtlParseError(Exception): + """Raised when an ETL parser fails to produce content.""" + + +class EtlServiceUnavailableError(Exception): + """Raised when the configured ETL_SERVICE is not recognised.""" diff --git a/surfsense_backend/app/etl_pipeline/file_classifier.py b/surfsense_backend/app/etl_pipeline/file_classifier.py new file mode 100644 index 000000000..40c2d5aff --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/file_classifier.py @@ -0,0 +1,49 @@ +from enum import Enum +from pathlib import PurePosixPath + + +PLAINTEXT_EXTENSIONS = frozenset( + { + ".md", ".markdown", ".txt", ".text", + ".json", ".jsonl", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", ".xml", + ".css", ".scss", ".less", ".sass", + ".py", ".pyw", ".pyi", ".pyx", + ".js", ".jsx", ".ts", ".tsx", ".mjs", ".cjs", + ".java", ".kt", ".kts", ".scala", ".groovy", + ".c", ".h", ".cpp", ".cxx", ".cc", ".hpp", ".hxx", + ".cs", ".fs", ".fsx", + ".go", ".rs", ".rb", ".php", ".pl", ".pm", ".lua", ".swift", + ".m", ".mm", ".r", ".jl", + ".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1", + ".sql", ".graphql", ".gql", + ".env", ".gitignore", ".dockerignore", ".editorconfig", + ".makefile", ".cmake", + ".log", ".rst", ".tex", ".bib", ".org", ".adoc", ".asciidoc", + ".vue", ".svelte", ".astro", + ".tf", ".hcl", ".proto", + } +) + +AUDIO_EXTENSIONS = frozenset( + {".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"} +) + +DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"}) + + +class FileCategory(Enum): + PLAINTEXT = "plaintext" + AUDIO = "audio" + DIRECT_CONVERT = "direct_convert" + DOCUMENT = "document" + + +def classify_file(filename: str) -> FileCategory: + suffix = PurePosixPath(filename).suffix.lower() + if suffix in PLAINTEXT_EXTENSIONS: + return FileCategory.PLAINTEXT + if suffix in AUDIO_EXTENSIONS: + return FileCategory.AUDIO + if suffix in DIRECT_CONVERT_EXTENSIONS: + return FileCategory.DIRECT_CONVERT + return FileCategory.DOCUMENT From 02fc6f1d1616de98a566d6925f96061a86a114db Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:26:03 +0530 Subject: [PATCH 02/37] feat: add audio transcription functionality to ETL pipeline --- .../app/etl_pipeline/parsers/__init__.py | 0 .../app/etl_pipeline/parsers/audio.py | 34 +++++++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/parsers/__init__.py create mode 100644 surfsense_backend/app/etl_pipeline/parsers/audio.py diff --git a/surfsense_backend/app/etl_pipeline/parsers/__init__.py b/surfsense_backend/app/etl_pipeline/parsers/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/surfsense_backend/app/etl_pipeline/parsers/audio.py b/surfsense_backend/app/etl_pipeline/parsers/audio.py new file mode 100644 index 000000000..cd49bafde --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/audio.py @@ -0,0 +1,34 @@ +from litellm import atranscription + +from app.config import config as app_config + + +async def transcribe_audio(file_path: str, filename: str) -> str: + stt_service_type = ( + "local" + if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") + else "external" + ) + + if stt_service_type == "local": + from app.services.stt_service import stt_service + + result = stt_service.transcribe_file(file_path) + text = result.get("text", "") + if not text: + raise ValueError("Transcription returned empty text") + else: + with open(file_path, "rb") as audio_file: + kwargs: dict = { + "model": app_config.STT_SERVICE, + "file": audio_file, + "api_key": app_config.STT_SERVICE_API_KEY, + } + if app_config.STT_SERVICE_API_BASE: + kwargs["api_base"] = app_config.STT_SERVICE_API_BASE + response = await atranscription(**kwargs) + text = response.get("text", "") + if not text: + raise ValueError("Transcription returned empty text") + + return f"# Transcription of {filename}\n\n{text}" From 35582c9389a9bc08f11cd603d8d9ed635e5a6218 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:26:29 +0530 Subject: [PATCH 03/37] feat: add direct_convert module to ETL pipeline for file conversion --- surfsense_backend/app/etl_pipeline/parsers/direct_convert.py | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/parsers/direct_convert.py diff --git a/surfsense_backend/app/etl_pipeline/parsers/direct_convert.py b/surfsense_backend/app/etl_pipeline/parsers/direct_convert.py new file mode 100644 index 000000000..c9e6e8647 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/direct_convert.py @@ -0,0 +1,3 @@ +from app.tasks.document_processors._direct_converters import convert_file_directly + +__all__ = ["convert_file_directly"] From 2824410be225e43d5b22335776ca009c8c1ae2d1 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:26:42 +0530 Subject: [PATCH 04/37] feat: add plaintext parser to ETL pipeline for reading text files --- surfsense_backend/app/etl_pipeline/parsers/plaintext.py | 8 ++++++++ 1 file changed, 8 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/parsers/plaintext.py diff --git a/surfsense_backend/app/etl_pipeline/parsers/plaintext.py b/surfsense_backend/app/etl_pipeline/parsers/plaintext.py new file mode 100644 index 000000000..24bfb71e5 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/plaintext.py @@ -0,0 +1,8 @@ +def read_plaintext(file_path: str) -> str: + with open(file_path, encoding="utf-8", errors="replace") as f: + content = f.read() + if "\x00" in content: + raise ValueError( + f"File contains null bytes — likely a binary file opened as text: {file_path}" + ) + return content From f40de6b6954c1ca286a022eebff7e994213d6f26 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:27:24 +0530 Subject: [PATCH 05/37] feat: add parsers for Docling, LlamaCloud, and Unstructured to ETL pipeline --- .../app/etl_pipeline/parsers/docling.py | 26 ++++ .../app/etl_pipeline/parsers/llamacloud.py | 129 ++++++++++++++++++ .../app/etl_pipeline/parsers/unstructured.py | 14 ++ 3 files changed, 169 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/parsers/docling.py create mode 100644 surfsense_backend/app/etl_pipeline/parsers/llamacloud.py create mode 100644 surfsense_backend/app/etl_pipeline/parsers/unstructured.py diff --git a/surfsense_backend/app/etl_pipeline/parsers/docling.py b/surfsense_backend/app/etl_pipeline/parsers/docling.py new file mode 100644 index 000000000..df0498148 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/docling.py @@ -0,0 +1,26 @@ +import warnings +from logging import ERROR, getLogger + + +async def parse_with_docling(file_path: str, filename: str) -> str: + from app.services.docling_service import create_docling_service + + docling_service = create_docling_service() + + pdfminer_logger = getLogger("pdfminer") + original_level = pdfminer_logger.level + + with warnings.catch_warnings(): + warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") + warnings.filterwarnings( + "ignore", message=".*Cannot set gray non-stroke color.*" + ) + warnings.filterwarnings("ignore", message=".*invalid float value.*") + pdfminer_logger.setLevel(ERROR) + + try: + result = await docling_service.process_document(file_path, filename) + finally: + pdfminer_logger.setLevel(original_level) + + return result["content"] diff --git a/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py b/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py new file mode 100644 index 000000000..5115aebea --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/llamacloud.py @@ -0,0 +1,129 @@ +import asyncio +import logging +import os +import random + +import httpx + +from app.config import config as app_config +from app.etl_pipeline.constants import ( + LLAMACLOUD_BASE_DELAY, + LLAMACLOUD_MAX_DELAY, + LLAMACLOUD_MAX_RETRIES, + LLAMACLOUD_RETRYABLE_EXCEPTIONS, + PER_PAGE_JOB_TIMEOUT, + calculate_job_timeout, + calculate_upload_timeout, +) + + +async def parse_with_llamacloud(file_path: str, estimated_pages: int) -> str: + from llama_cloud_services import LlamaParse + from llama_cloud_services.parse.utils import ResultType + + file_size_bytes = os.path.getsize(file_path) + file_size_mb = file_size_bytes / (1024 * 1024) + + upload_timeout = calculate_upload_timeout(file_size_bytes) + job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) + + custom_timeout = httpx.Timeout( + connect=120.0, + read=upload_timeout, + write=upload_timeout, + pool=120.0, + ) + + logging.info( + f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " + f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " + f"job_timeout={job_timeout:.0f}s" + ) + + last_exception = None + attempt_errors: list[str] = [] + + for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): + try: + async with httpx.AsyncClient(timeout=custom_timeout) as custom_client: + parser = LlamaParse( + api_key=app_config.LLAMA_CLOUD_API_KEY, + num_workers=1, + verbose=True, + language="en", + result_type=ResultType.MD, + max_timeout=int(max(2000, job_timeout + upload_timeout)), + job_timeout_in_seconds=job_timeout, + job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, + custom_client=custom_client, + ) + result = await parser.aparse(file_path) + + if attempt > 1: + logging.info( + f"LlamaCloud upload succeeded on attempt {attempt} after " + f"{len(attempt_errors)} failures" + ) + + if hasattr(result, "get_markdown_documents"): + markdown_docs = result.get_markdown_documents( + split_by_page=False + ) + if markdown_docs and hasattr(markdown_docs[0], "text"): + return markdown_docs[0].text + if hasattr(result, "pages") and result.pages: + return "\n\n".join( + p.md + for p in result.pages + if hasattr(p, "md") and p.md + ) + return str(result) + + if isinstance(result, list): + if result and hasattr(result[0], "text"): + return result[0].text + return "\n\n".join( + doc.page_content + if hasattr(doc, "page_content") + else str(doc) + for doc in result + ) + + return str(result) + + except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: + last_exception = e + error_type = type(e).__name__ + error_msg = str(e)[:200] + attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") + + if attempt < LLAMACLOUD_MAX_RETRIES: + base_delay = min( + LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), + LLAMACLOUD_MAX_DELAY, + ) + jitter = base_delay * 0.25 * (2 * random.random() - 1) + delay = base_delay + jitter + + logging.warning( + f"LlamaCloud upload failed " + f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " + f"{error_type}. File: {file_size_mb:.1f}MB. " + f"Retrying in {delay:.0f}s..." + ) + await asyncio.sleep(delay) + else: + logging.error( + f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} " + f"attempts. File size: {file_size_mb:.1f}MB, " + f"Pages: {estimated_pages}. " + f"Errors: {'; '.join(attempt_errors)}" + ) + + except Exception: + raise + + raise last_exception or RuntimeError( + f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " + f"File size: {file_size_mb:.1f}MB" + ) diff --git a/surfsense_backend/app/etl_pipeline/parsers/unstructured.py b/surfsense_backend/app/etl_pipeline/parsers/unstructured.py new file mode 100644 index 000000000..af8fb99b6 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/parsers/unstructured.py @@ -0,0 +1,14 @@ +async def parse_with_unstructured(file_path: str) -> str: + from langchain_unstructured import UnstructuredLoader + + loader = UnstructuredLoader( + file_path, + mode="elements", + post_processors=[], + languages=["eng"], + include_orig_elements=False, + include_metadata=False, + strategy="auto", + ) + docs = await loader.aload() + return "\n\n".join(doc.page_content for doc in docs if doc.page_content) From 1248363ca980916cd5e16df66dbcc3cd37a2e68f Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:29:24 +0530 Subject: [PATCH 06/37] refactor: consolidate document processing logic and remove unused files and ETL strategies --- .../app/tasks/document_processors/__init__.py | 30 +-- .../tasks/document_processors/_constants.py | 74 ------- .../app/tasks/document_processors/_etl.py | 209 ------------------ .../app/tasks/document_processors/_helpers.py | 25 --- .../app/tasks/document_processors/_save.py | 81 ------- 5 files changed, 3 insertions(+), 416 deletions(-) delete mode 100644 surfsense_backend/app/tasks/document_processors/_constants.py delete mode 100644 surfsense_backend/app/tasks/document_processors/_etl.py diff --git a/surfsense_backend/app/tasks/document_processors/__init__.py b/surfsense_backend/app/tasks/document_processors/__init__.py index 2b5690d02..f82c10883 100644 --- a/surfsense_backend/app/tasks/document_processors/__init__.py +++ b/surfsense_backend/app/tasks/document_processors/__init__.py @@ -1,41 +1,17 @@ """ Document processors module for background tasks. -This module provides a collection of document processors for different content types -and sources. Each processor is responsible for handling a specific type of document -processing task in the background. - -Available processors: -- Extension processor: Handle documents from browser extension -- Markdown processor: Process markdown files -- File processors: Handle files using different ETL services (Unstructured, LlamaCloud, Docling) -- YouTube processor: Process YouTube videos and extract transcripts +Content extraction is handled by ``app.etl_pipeline.EtlPipelineService``. +This package keeps orchestration (save, notify, page-limit) and +non-ETL processors (extension, markdown, youtube). """ -# Extension processor -# File processors (backward-compatible re-exports from _save) -from ._save import ( - add_received_file_document_using_docling, - add_received_file_document_using_llamacloud, - add_received_file_document_using_unstructured, -) from .extension_processor import add_extension_received_document - -# Markdown processor from .markdown_processor import add_received_markdown_file_document - -# YouTube processor from .youtube_processor import add_youtube_video_document __all__ = [ - # Extension processing "add_extension_received_document", - # File processing with different ETL services - "add_received_file_document_using_docling", - "add_received_file_document_using_llamacloud", - "add_received_file_document_using_unstructured", - # Markdown file processing "add_received_markdown_file_document", - # YouTube video processing "add_youtube_video_document", ] diff --git a/surfsense_backend/app/tasks/document_processors/_constants.py b/surfsense_backend/app/tasks/document_processors/_constants.py deleted file mode 100644 index f74d7acce..000000000 --- a/surfsense_backend/app/tasks/document_processors/_constants.py +++ /dev/null @@ -1,74 +0,0 @@ -""" -Constants for file document processing. - -Centralizes file type classification, LlamaCloud retry configuration, -and timeout calculation parameters. -""" - -import ssl -from enum import Enum - -import httpx - -# --------------------------------------------------------------------------- -# File type classification -# --------------------------------------------------------------------------- - -MARKDOWN_EXTENSIONS = (".md", ".markdown", ".txt") -AUDIO_EXTENSIONS = (".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm") -DIRECT_CONVERT_EXTENSIONS = (".csv", ".tsv", ".html", ".htm") - - -class FileCategory(Enum): - MARKDOWN = "markdown" - AUDIO = "audio" - DIRECT_CONVERT = "direct_convert" - DOCUMENT = "document" - - -def classify_file(filename: str) -> FileCategory: - """Classify a file by its extension into a processing category.""" - lower = filename.lower() - if lower.endswith(MARKDOWN_EXTENSIONS): - return FileCategory.MARKDOWN - if lower.endswith(AUDIO_EXTENSIONS): - return FileCategory.AUDIO - if lower.endswith(DIRECT_CONVERT_EXTENSIONS): - return FileCategory.DIRECT_CONVERT - return FileCategory.DOCUMENT - - -# --------------------------------------------------------------------------- -# LlamaCloud retry configuration -# --------------------------------------------------------------------------- - -LLAMACLOUD_MAX_RETRIES = 5 -LLAMACLOUD_BASE_DELAY = 10 # seconds (exponential backoff base) -LLAMACLOUD_MAX_DELAY = 120 # max delay between retries (2 minutes) -LLAMACLOUD_RETRYABLE_EXCEPTIONS = ( - ssl.SSLError, - httpx.ConnectError, - httpx.ConnectTimeout, - httpx.ReadError, - httpx.ReadTimeout, - httpx.WriteError, - httpx.WriteTimeout, - httpx.RemoteProtocolError, - httpx.LocalProtocolError, - ConnectionError, - ConnectionResetError, - TimeoutError, - OSError, -) - -# --------------------------------------------------------------------------- -# Timeout calculation constants -# --------------------------------------------------------------------------- - -UPLOAD_BYTES_PER_SECOND_SLOW = ( - 100 * 1024 -) # 100 KB/s (conservative for slow connections) -MIN_UPLOAD_TIMEOUT = 120 # Minimum 2 minutes for any file -MAX_UPLOAD_TIMEOUT = 1800 # Maximum 30 minutes for very large files -BASE_JOB_TIMEOUT = 600 # 10 minutes base for job processing -PER_PAGE_JOB_TIMEOUT = 60 # 1 minute per page for processing diff --git a/surfsense_backend/app/tasks/document_processors/_etl.py b/surfsense_backend/app/tasks/document_processors/_etl.py deleted file mode 100644 index cc3a8b1ac..000000000 --- a/surfsense_backend/app/tasks/document_processors/_etl.py +++ /dev/null @@ -1,209 +0,0 @@ -""" -ETL parsing strategies for different document processing services. - -Provides parse functions for Unstructured, LlamaCloud, and Docling, along with -LlamaCloud retry logic and dynamic timeout calculations. -""" - -import asyncio -import logging -import os -import random -import warnings -from logging import ERROR, getLogger - -import httpx - -from app.config import config as app_config -from app.db import Log -from app.services.task_logging_service import TaskLoggingService - -from ._constants import ( - LLAMACLOUD_BASE_DELAY, - LLAMACLOUD_MAX_DELAY, - LLAMACLOUD_MAX_RETRIES, - LLAMACLOUD_RETRYABLE_EXCEPTIONS, - PER_PAGE_JOB_TIMEOUT, -) -from ._helpers import calculate_job_timeout, calculate_upload_timeout - -# --------------------------------------------------------------------------- -# LlamaCloud parsing with retry -# --------------------------------------------------------------------------- - - -async def parse_with_llamacloud_retry( - file_path: str, - estimated_pages: int, - task_logger: TaskLoggingService | None = None, - log_entry: Log | None = None, -): - """ - Parse a file with LlamaCloud with retry logic for transient SSL/connection errors. - - Uses dynamic timeout calculations based on file size and page count to handle - very large files reliably. - - Returns: - LlamaParse result object - - Raises: - Exception: If all retries fail - """ - from llama_cloud_services import LlamaParse - from llama_cloud_services.parse.utils import ResultType - - file_size_bytes = os.path.getsize(file_path) - file_size_mb = file_size_bytes / (1024 * 1024) - - upload_timeout = calculate_upload_timeout(file_size_bytes) - job_timeout = calculate_job_timeout(estimated_pages, file_size_bytes) - - custom_timeout = httpx.Timeout( - connect=120.0, - read=upload_timeout, - write=upload_timeout, - pool=120.0, - ) - - logging.info( - f"LlamaCloud upload configured: file_size={file_size_mb:.1f}MB, " - f"pages={estimated_pages}, upload_timeout={upload_timeout:.0f}s, " - f"job_timeout={job_timeout:.0f}s" - ) - - last_exception = None - attempt_errors: list[str] = [] - - for attempt in range(1, LLAMACLOUD_MAX_RETRIES + 1): - try: - async with httpx.AsyncClient(timeout=custom_timeout) as custom_client: - parser = LlamaParse( - api_key=app_config.LLAMA_CLOUD_API_KEY, - num_workers=1, - verbose=True, - language="en", - result_type=ResultType.MD, - max_timeout=int(max(2000, job_timeout + upload_timeout)), - job_timeout_in_seconds=job_timeout, - job_timeout_extra_time_per_page_in_seconds=PER_PAGE_JOB_TIMEOUT, - custom_client=custom_client, - ) - result = await parser.aparse(file_path) - - if attempt > 1: - logging.info( - f"LlamaCloud upload succeeded on attempt {attempt} after " - f"{len(attempt_errors)} failures" - ) - return result - - except LLAMACLOUD_RETRYABLE_EXCEPTIONS as e: - last_exception = e - error_type = type(e).__name__ - error_msg = str(e)[:200] - attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}") - - if attempt < LLAMACLOUD_MAX_RETRIES: - base_delay = min( - LLAMACLOUD_BASE_DELAY * (2 ** (attempt - 1)), - LLAMACLOUD_MAX_DELAY, - ) - jitter = base_delay * 0.25 * (2 * random.random() - 1) - delay = base_delay + jitter - - if task_logger and log_entry: - await task_logger.log_task_progress( - log_entry, - f"LlamaCloud upload failed " - f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}), " - f"retrying in {delay:.0f}s", - { - "error_type": error_type, - "error_message": error_msg, - "attempt": attempt, - "retry_delay": delay, - "file_size_mb": round(file_size_mb, 1), - "upload_timeout": upload_timeout, - }, - ) - else: - logging.warning( - f"LlamaCloud upload failed " - f"(attempt {attempt}/{LLAMACLOUD_MAX_RETRIES}): " - f"{error_type}. File: {file_size_mb:.1f}MB. " - f"Retrying in {delay:.0f}s..." - ) - - await asyncio.sleep(delay) - else: - logging.error( - f"LlamaCloud upload failed after {LLAMACLOUD_MAX_RETRIES} " - f"attempts. File size: {file_size_mb:.1f}MB, " - f"Pages: {estimated_pages}. " - f"Errors: {'; '.join(attempt_errors)}" - ) - - except Exception: - raise - - raise last_exception or RuntimeError( - f"LlamaCloud parsing failed after {LLAMACLOUD_MAX_RETRIES} retries. " - f"File size: {file_size_mb:.1f}MB" - ) - - -# --------------------------------------------------------------------------- -# Per-service parse functions -# --------------------------------------------------------------------------- - - -async def parse_with_unstructured(file_path: str): - """ - Parse a file using the Unstructured ETL service. - - Returns: - List of LangChain Document elements. - """ - from langchain_unstructured import UnstructuredLoader - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - return await loader.aload() - - -async def parse_with_docling(file_path: str, filename: str) -> str: - """ - Parse a file using the Docling ETL service (via the Docling service wrapper). - - Returns: - Markdown content string. - """ - from app.services.docling_service import create_docling_service - - docling_service = create_docling_service() - - pdfminer_logger = getLogger("pdfminer") - original_level = pdfminer_logger.level - - with warnings.catch_warnings(): - warnings.filterwarnings("ignore", category=UserWarning, module="pdfminer") - warnings.filterwarnings( - "ignore", message=".*Cannot set gray non-stroke color.*" - ) - warnings.filterwarnings("ignore", message=".*invalid float value.*") - pdfminer_logger.setLevel(ERROR) - - try: - result = await docling_service.process_document(file_path, filename) - finally: - pdfminer_logger.setLevel(original_level) - - return result["content"] diff --git a/surfsense_backend/app/tasks/document_processors/_helpers.py b/surfsense_backend/app/tasks/document_processors/_helpers.py index 7ac05932c..9cd7b87c9 100644 --- a/surfsense_backend/app/tasks/document_processors/_helpers.py +++ b/surfsense_backend/app/tasks/document_processors/_helpers.py @@ -11,13 +11,6 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentStatus, DocumentType from app.utils.document_converters import generate_unique_identifier_hash -from ._constants import ( - BASE_JOB_TIMEOUT, - MAX_UPLOAD_TIMEOUT, - MIN_UPLOAD_TIMEOUT, - PER_PAGE_JOB_TIMEOUT, - UPLOAD_BYTES_PER_SECOND_SLOW, -) from .base import ( check_document_by_unique_identifier, check_duplicate_document, @@ -198,21 +191,3 @@ async def update_document_from_connector( if "connector_id" in connector: document.connector_id = connector["connector_id"] await session.commit() - - -# --------------------------------------------------------------------------- -# Timeout calculations -# --------------------------------------------------------------------------- - - -def calculate_upload_timeout(file_size_bytes: int) -> float: - """Calculate upload timeout based on file size (conservative for slow connections).""" - estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5 - return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT)) - - -def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float: - """Calculate job processing timeout based on page count and file size.""" - page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT) - size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60 - return max(page_based_timeout, size_based_timeout) diff --git a/surfsense_backend/app/tasks/document_processors/_save.py b/surfsense_backend/app/tasks/document_processors/_save.py index 5088ad004..ae45f7a69 100644 --- a/surfsense_backend/app/tasks/document_processors/_save.py +++ b/surfsense_backend/app/tasks/document_processors/_save.py @@ -1,14 +1,9 @@ """ Unified document save/update logic for file processors. - -Replaces the three nearly-identical ``add_received_file_document_using_*`` -functions with a single ``save_file_document`` function plus thin wrappers -for backward compatibility. """ import logging -from langchain_core.documents import Document as LangChainDocument from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -207,79 +202,3 @@ async def save_file_document( raise RuntimeError( f"Failed to process file document using {etl_service}: {e!s}" ) from e - - -# --------------------------------------------------------------------------- -# Backward-compatible wrapper functions -# --------------------------------------------------------------------------- - - -async def add_received_file_document_using_unstructured( - session: AsyncSession, - file_name: str, - unstructured_processed_elements: list[LangChainDocument], - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store a file document using the Unstructured service.""" - from app.utils.document_converters import convert_document_to_markdown - - markdown_content = await convert_document_to_markdown( - unstructured_processed_elements - ) - return await save_file_document( - session, - file_name, - markdown_content, - search_space_id, - user_id, - "UNSTRUCTURED", - connector, - enable_summary, - ) - - -async def add_received_file_document_using_llamacloud( - session: AsyncSession, - file_name: str, - llamacloud_markdown_document: str, - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store document content parsed by LlamaCloud.""" - return await save_file_document( - session, - file_name, - llamacloud_markdown_document, - search_space_id, - user_id, - "LLAMACLOUD", - connector, - enable_summary, - ) - - -async def add_received_file_document_using_docling( - session: AsyncSession, - file_name: str, - docling_markdown_document: str, - search_space_id: int, - user_id: str, - connector: dict | None = None, - enable_summary: bool = True, -) -> Document | None: - """Process and store document content parsed by Docling.""" - return await save_file_document( - session, - file_name, - docling_markdown_document, - search_space_id, - user_id, - "DOCLING", - connector, - enable_summary, - ) From 8224360afa532300ffcd3afb7f4ea2627b253e99 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:30:29 +0530 Subject: [PATCH 07/37] refactor: unify file parsing logic across Dropbox, Google Drive, and OneDrive using the ETL pipeline --- .../connectors/dropbox/content_extractor.py | 8 +- .../google_drive/content_extractor.py | 102 ++-------------- .../connectors/onedrive/content_extractor.py | 110 ++---------------- 3 files changed, 21 insertions(+), 199 deletions(-) diff --git a/surfsense_backend/app/connectors/dropbox/content_extractor.py b/surfsense_backend/app/connectors/dropbox/content_extractor.py index e89893b14..8e947eee7 100644 --- a/surfsense_backend/app/connectors/dropbox/content_extractor.py +++ b/surfsense_backend/app/connectors/dropbox/content_extractor.py @@ -87,9 +87,13 @@ async def download_and_extract_content( if error: return None, metadata, error - from app.connectors.onedrive.content_extractor import _parse_file_to_markdown + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - markdown = await _parse_file_to_markdown(temp_file_path, file_name) + result = await EtlPipelineService().extract( + EtlRequest(file_path=temp_file_path, filename=file_name) + ) + markdown = result.markdown_content return markdown, metadata, None except Exception as e: diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 1e94133b4..0c559fee9 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -1,12 +1,9 @@ """Content extraction for Google Drive files.""" -import asyncio import contextlib import logging import os import tempfile -import threading -import time from pathlib import Path from typing import Any @@ -110,99 +107,14 @@ async def download_and_extract_content( async def _parse_file_to_markdown(file_path: str, filename: str) -> str: - """Parse a local file to markdown using the configured ETL service.""" - lower = filename.lower() + """Parse a local file to markdown using the unified ETL pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - if lower.endswith((".md", ".markdown", ".txt")): - with open(file_path, encoding="utf-8") as f: - return f.read() - - if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")): - from litellm import atranscription - - from app.config import config as app_config - - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - if stt_service_type == "local": - from app.services.stt_service import stt_service - - t0 = time.monotonic() - logger.info( - f"[local-stt] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(stt_service.transcribe_file, file_path) - logger.info( - f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - text = result.get("text", "") - else: - with open(file_path, "rb") as audio_file: - kwargs: dict[str, Any] = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - resp = await atranscription(**kwargs) - text = resp.get("text", "") - - if not text: - raise ValueError("Transcription returned empty text") - return f"# Transcription of {filename}\n\n{text}" - - # Document files -- use configured ETL service - from app.config import config as app_config - - if app_config.ETL_SERVICE == "UNSTRUCTURED": - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - docs = await loader.aload() - return await convert_document_to_markdown(docs) - - if app_config.ETL_SERVICE == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - result = await parse_with_llamacloud_retry( - file_path=file_path, estimated_pages=50 - ) - markdown_documents = await result.aget_markdown_documents(split_by_page=False) - if not markdown_documents: - raise RuntimeError(f"LlamaCloud returned no documents for {filename}") - return markdown_documents[0].text - - if app_config.ETL_SERVICE == "DOCLING": - from docling.document_converter import DocumentConverter - - converter = DocumentConverter() - t0 = time.monotonic() - logger.info( - f"[docling] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(converter.convert, file_path) - logger.info( - f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - return result.document.export_to_markdown() - - raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + result = await EtlPipelineService().extract( + EtlRequest(file_path=file_path, filename=filename) + ) + return result.markdown_content async def download_and_process_file( diff --git a/surfsense_backend/app/connectors/onedrive/content_extractor.py b/surfsense_backend/app/connectors/onedrive/content_extractor.py index 8917ba1fd..2355993eb 100644 --- a/surfsense_backend/app/connectors/onedrive/content_extractor.py +++ b/surfsense_backend/app/connectors/onedrive/content_extractor.py @@ -1,16 +1,9 @@ -"""Content extraction for OneDrive files. +"""Content extraction for OneDrive files.""" -Reuses the same ETL parsing logic as Google Drive since file parsing is -extension-based, not provider-specific. -""" - -import asyncio import contextlib import logging import os import tempfile -import threading -import time from pathlib import Path from typing import Any @@ -84,98 +77,11 @@ async def download_and_extract_content( async def _parse_file_to_markdown(file_path: str, filename: str) -> str: - """Parse a local file to markdown using the configured ETL service. + """Parse a local file to markdown using the unified ETL pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - Same logic as Google Drive -- file parsing is extension-based. - """ - lower = filename.lower() - - if lower.endswith((".md", ".markdown", ".txt")): - with open(file_path, encoding="utf-8") as f: - return f.read() - - if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")): - from litellm import atranscription - - from app.config import config as app_config - - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - if stt_service_type == "local": - from app.services.stt_service import stt_service - - t0 = time.monotonic() - logger.info( - f"[local-stt] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(stt_service.transcribe_file, file_path) - logger.info( - f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - text = result.get("text", "") - else: - with open(file_path, "rb") as audio_file: - kwargs: dict[str, Any] = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - resp = await atranscription(**kwargs) - text = resp.get("text", "") - - if not text: - raise ValueError("Transcription returned empty text") - return f"# Transcription of {filename}\n\n{text}" - - from app.config import config as app_config - - if app_config.ETL_SERVICE == "UNSTRUCTURED": - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - docs = await loader.aload() - return await convert_document_to_markdown(docs) - - if app_config.ETL_SERVICE == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - result = await parse_with_llamacloud_retry( - file_path=file_path, estimated_pages=50 - ) - markdown_documents = await result.aget_markdown_documents(split_by_page=False) - if not markdown_documents: - raise RuntimeError(f"LlamaCloud returned no documents for {filename}") - return markdown_documents[0].text - - if app_config.ETL_SERVICE == "DOCLING": - from docling.document_converter import DocumentConverter - - converter = DocumentConverter() - t0 = time.monotonic() - logger.info( - f"[docling] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(converter.convert, file_path) - logger.info( - f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - return result.document.export_to_markdown() - - raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + result = await EtlPipelineService().extract( + EtlRequest(file_path=file_path, filename=filename) + ) + return result.markdown_content From 87af012a60eee451e0af2311e3aa1547e6a6616e Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:45:18 +0530 Subject: [PATCH 08/37] refactor: streamline file processing by integrating ETL pipeline for all file types and removing redundant functions --- .../local_folder_indexer.py | 178 +--- .../document_processors/file_processors.py | 785 +++--------------- 2 files changed, 123 insertions(+), 840 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index acfbce0bf..749dbf731 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -44,132 +44,6 @@ from .base import ( logger, ) -PLAINTEXT_EXTENSIONS = frozenset( - { - ".md", - ".markdown", - ".txt", - ".text", - ".json", - ".jsonl", - ".yaml", - ".yml", - ".toml", - ".ini", - ".cfg", - ".conf", - ".xml", - ".css", - ".scss", - ".less", - ".sass", - ".py", - ".pyw", - ".pyi", - ".pyx", - ".js", - ".jsx", - ".ts", - ".tsx", - ".mjs", - ".cjs", - ".java", - ".kt", - ".kts", - ".scala", - ".groovy", - ".c", - ".h", - ".cpp", - ".cxx", - ".cc", - ".hpp", - ".hxx", - ".cs", - ".fs", - ".fsx", - ".go", - ".rs", - ".rb", - ".php", - ".pl", - ".pm", - ".lua", - ".swift", - ".m", - ".mm", - ".r", - ".R", - ".jl", - ".sh", - ".bash", - ".zsh", - ".fish", - ".bat", - ".cmd", - ".ps1", - ".sql", - ".graphql", - ".gql", - ".env", - ".gitignore", - ".dockerignore", - ".editorconfig", - ".makefile", - ".cmake", - ".log", - ".rst", - ".tex", - ".bib", - ".org", - ".adoc", - ".asciidoc", - ".vue", - ".svelte", - ".astro", - ".tf", - ".hcl", - ".proto", - } -) - -AUDIO_EXTENSIONS = frozenset( - { - ".mp3", - ".mp4", - ".mpeg", - ".mpga", - ".m4a", - ".wav", - ".webm", - } -) - - -DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"}) - - -def _is_plaintext_file(filename: str) -> bool: - return Path(filename).suffix.lower() in PLAINTEXT_EXTENSIONS - - -def _is_audio_file(filename: str) -> bool: - return Path(filename).suffix.lower() in AUDIO_EXTENSIONS - - -def _is_direct_convert_file(filename: str) -> bool: - return Path(filename).suffix.lower() in DIRECT_CONVERT_EXTENSIONS - - -def _needs_etl(filename: str) -> bool: - """File is not plaintext, not audio, and not direct-convert — requires ETL.""" - return ( - not _is_plaintext_file(filename) - and not _is_audio_file(filename) - and not _is_direct_convert_file(filename) - ) - - HeartbeatCallbackType = Callable[[int], Awaitable[None]] @@ -278,57 +152,21 @@ def scan_folder( return files -def _read_plaintext_file(file_path: str) -> str: - """Read a plaintext/text-based file as UTF-8.""" - with open(file_path, encoding="utf-8", errors="replace") as f: - content = f.read() - if "\x00" in content: - raise ValueError( - f"File contains null bytes — likely a binary file opened as text: {file_path}" - ) - return content async def _read_file_content(file_path: str, filename: str) -> str: - """Read file content, using ETL for binary formats. + """Read file content via the unified ETL pipeline. - Plaintext files are read directly. Audio and document files (PDF, DOCX, etc.) - are routed through the configured ETL service (same as Google Drive / OneDrive). - - Raises ValueError if the file cannot be parsed (e.g. no ETL service configured - for a binary file). + All file types (plaintext, audio, direct-convert, document) are handled + by ``EtlPipelineService``. """ - if _is_plaintext_file(filename): - return _read_plaintext_file(file_path) + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - if _is_direct_convert_file(filename): - from app.tasks.document_processors._direct_converters import ( - convert_file_directly, - ) - - return convert_file_directly(file_path, filename) - - if _is_audio_file(filename): - etl_service = config.ETL_SERVICE if hasattr(config, "ETL_SERVICE") else None - stt_service_val = config.STT_SERVICE if hasattr(config, "STT_SERVICE") else None - if not stt_service_val and not etl_service: - raise ValueError( - f"No STT_SERVICE configured — cannot transcribe audio file: {filename}" - ) - - if _needs_etl(filename): - etl_service = getattr(config, "ETL_SERVICE", None) - if not etl_service: - raise ValueError( - f"No ETL_SERVICE configured — cannot parse binary file: {filename}. " - f"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env" - ) - - from app.connectors.onedrive.content_extractor import ( - _parse_file_to_markdown, + result = await EtlPipelineService().extract( + EtlRequest(file_path=file_path, filename=filename) ) - - return await _parse_file_to_markdown(file_path, filename) + return result.markdown_content def _content_hash(content: str, search_space_id: int) -> str: diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 0c1cad52d..f54a963ad 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -1,14 +1,8 @@ """ File document processors orchestrating content extraction and indexing. -This module is the public entry point for file processing. It delegates to -specialised sub-modules that each own a single concern: - -- ``_constants`` — file type classification and configuration constants -- ``_helpers`` — document deduplication, migration, connector helpers -- ``_direct_converters`` — lossless file-to-markdown for csv/tsv/html -- ``_etl`` — ETL parsing strategies (Unstructured, LlamaCloud, Docling) -- ``_save`` — unified document creation / update logic +Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and +keeps only orchestration concerns (notifications, logging, page limits, saving). """ from __future__ import annotations @@ -17,38 +11,19 @@ import contextlib import logging import os from dataclasses import dataclass, field -from logging import ERROR, getLogger from fastapi import HTTPException from sqlalchemy.ext.asyncio import AsyncSession -from app.config import config as app_config from app.db import Document, Log, Notification from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService -from ._constants import FileCategory, classify_file -from ._direct_converters import convert_file_directly -from ._etl import ( - parse_with_docling, - parse_with_llamacloud_retry, - parse_with_unstructured, -) from ._helpers import update_document_from_connector -from ._save import ( - add_received_file_document_using_docling, - add_received_file_document_using_llamacloud, - add_received_file_document_using_unstructured, - save_file_document, -) +from ._save import save_file_document from .markdown_processor import add_received_markdown_file_document -# Re-export public API so existing ``from file_processors import …`` keeps working. __all__ = [ - "add_received_file_document_using_docling", - "add_received_file_document_using_llamacloud", - "add_received_file_document_using_unstructured", - "parse_with_llamacloud_retry", "process_file_in_background", "process_file_in_background_with_document", "save_file_document", @@ -142,35 +117,31 @@ async def _log_page_divergence( # =================================================================== -async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None: - """Read a markdown / text file and create or update a document.""" - await _notify(ctx, "parsing", "Reading file") +async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None: + """Extract content from a non-document file (plaintext/direct_convert/audio) via the unified ETL pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService + + await _notify(ctx, "parsing", "Processing file") await ctx.task_logger.log_task_progress( ctx.log_entry, - f"Processing markdown/text file: {ctx.filename}", - {"file_type": "markdown", "processing_stage": "reading_file"}, + f"Processing file: {ctx.filename}", + {"processing_stage": "extracting"}, ) - with open(ctx.file_path, encoding="utf-8") as f: - markdown_content = f.read() + etl_result = await EtlPipelineService().extract( + EtlRequest(file_path=ctx.file_path, filename=ctx.filename) + ) with contextlib.suppress(Exception): os.unlink(ctx.file_path) await _notify(ctx, "chunking") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Creating document from markdown content: {ctx.filename}", - { - "processing_stage": "creating_document", - "content_length": len(markdown_content), - }, - ) result = await add_received_markdown_file_document( ctx.session, ctx.filename, - markdown_content, + etl_result.markdown_content, ctx.search_space_id, ctx.user_id, ctx.connector, @@ -181,179 +152,19 @@ async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None: if result: await ctx.task_logger.log_task_success( ctx.log_entry, - f"Successfully processed markdown file: {ctx.filename}", + f"Successfully processed file: {ctx.filename}", { "document_id": result.id, "content_hash": result.content_hash, - "file_type": "markdown", + "file_type": etl_result.content_type, + "etl_service": etl_result.etl_service, }, ) else: await ctx.task_logger.log_task_success( ctx.log_entry, - f"Markdown file already exists (duplicate): {ctx.filename}", - {"duplicate_detected": True, "file_type": "markdown"}, - ) - return result - - -async def _process_direct_convert_upload(ctx: _ProcessingContext) -> Document | None: - """Convert a text-based file (csv/tsv/html) to markdown without ETL.""" - await _notify(ctx, "parsing", "Converting file") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Direct-converting file to markdown: {ctx.filename}", - {"file_type": "direct_convert", "processing_stage": "converting"}, - ) - - markdown_content = convert_file_directly(ctx.file_path, ctx.filename) - - with contextlib.suppress(Exception): - os.unlink(ctx.file_path) - - await _notify(ctx, "chunking") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Creating document from converted content: {ctx.filename}", - { - "processing_stage": "creating_document", - "content_length": len(markdown_content), - }, - ) - - result = await add_received_markdown_file_document( - ctx.session, - ctx.filename, - markdown_content, - ctx.search_space_id, - ctx.user_id, - ctx.connector, - ) - if ctx.connector: - await update_document_from_connector(result, ctx.connector, ctx.session) - - if result: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Successfully direct-converted file: {ctx.filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "direct_convert", - }, - ) - else: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Direct-converted file already exists (duplicate): {ctx.filename}", - {"duplicate_detected": True, "file_type": "direct_convert"}, - ) - return result - - -async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None: - """Transcribe an audio file and create or update a document.""" - await _notify(ctx, "parsing", "Transcribing audio") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Processing audio file for transcription: {ctx.filename}", - {"file_type": "audio", "processing_stage": "starting_transcription"}, - ) - - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - - if stt_service_type == "local": - from app.services.stt_service import stt_service - - try: - stt_result = stt_service.transcribe_file(ctx.file_path) - transcribed_text = stt_result.get("text", "") - if not transcribed_text: - raise ValueError("Transcription returned empty text") - transcribed_text = ( - f"# Transcription of {ctx.filename}\n\n{transcribed_text}" - ) - except Exception as e: - raise HTTPException( - status_code=422, - detail=f"Failed to transcribe audio file {ctx.filename}: {e!s}", - ) from e - - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Local STT transcription completed: {ctx.filename}", - { - "processing_stage": "local_transcription_complete", - "language": stt_result.get("language"), - "confidence": stt_result.get("language_probability"), - "duration": stt_result.get("duration"), - }, - ) - else: - from litellm import atranscription - - with open(ctx.file_path, "rb") as audio_file: - transcription_kwargs: dict = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - - transcription_response = await atranscription(**transcription_kwargs) - transcribed_text = transcription_response.get("text", "") - if not transcribed_text: - raise ValueError("Transcription returned empty text") - - transcribed_text = f"# Transcription of {ctx.filename}\n\n{transcribed_text}" - - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Transcription completed, creating document: {ctx.filename}", - { - "processing_stage": "transcription_complete", - "transcript_length": len(transcribed_text), - }, - ) - - await _notify(ctx, "chunking") - - with contextlib.suppress(Exception): - os.unlink(ctx.file_path) - - result = await add_received_markdown_file_document( - ctx.session, - ctx.filename, - transcribed_text, - ctx.search_space_id, - ctx.user_id, - ctx.connector, - ) - if ctx.connector: - await update_document_from_connector(result, ctx.connector, ctx.session) - - if result: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Successfully transcribed and processed audio file: {ctx.filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "audio", - "transcript_length": len(transcribed_text), - "stt_service": stt_service_type, - }, - ) - else: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Audio file transcript already exists (duplicate): {ctx.filename}", - {"duplicate_detected": True, "file_type": "audio"}, + f"File already exists (duplicate): {ctx.filename}", + {"duplicate_detected": True, "file_type": etl_result.content_type}, ) return result @@ -363,279 +174,10 @@ async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None: # --------------------------------------------------------------------------- -async def _etl_unstructured( - ctx: _ProcessingContext, - page_limit_service, - estimated_pages: int, -) -> Document | None: - """Parse and save via the Unstructured ETL service.""" - await _notify(ctx, "parsing", "Extracting content") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Processing file with Unstructured ETL: {ctx.filename}", - { - "file_type": "document", - "etl_service": "UNSTRUCTURED", - "processing_stage": "loading", - }, - ) - - docs = await parse_with_unstructured(ctx.file_path) - - await _notify(ctx, "chunking", chunks_count=len(docs)) - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Unstructured ETL completed, creating document: {ctx.filename}", - {"processing_stage": "etl_complete", "elements_count": len(docs)}, - ) - - actual_pages = page_limit_service.estimate_pages_from_elements(docs) - final_pages = max(estimated_pages, actual_pages) - await _log_page_divergence( - ctx.task_logger, - ctx.log_entry, - ctx.filename, - estimated_pages, - actual_pages, - final_pages, - ) - - with contextlib.suppress(Exception): - os.unlink(ctx.file_path) - - result = await add_received_file_document_using_unstructured( - ctx.session, - ctx.filename, - docs, - ctx.search_space_id, - ctx.user_id, - ctx.connector, - enable_summary=ctx.enable_summary, - ) - if ctx.connector: - await update_document_from_connector(result, ctx.connector, ctx.session) - - if result: - await page_limit_service.update_page_usage( - ctx.user_id, final_pages, allow_exceed=True - ) - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Successfully processed file with Unstructured: {ctx.filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "document", - "etl_service": "UNSTRUCTURED", - "pages_processed": final_pages, - }, - ) - else: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Document already exists (duplicate): {ctx.filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "UNSTRUCTURED", - }, - ) - return result - - -async def _etl_llamacloud( - ctx: _ProcessingContext, - page_limit_service, - estimated_pages: int, -) -> Document | None: - """Parse and save via the LlamaCloud ETL service.""" - await _notify(ctx, "parsing", "Extracting content") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Processing file with LlamaCloud ETL: {ctx.filename}", - { - "file_type": "document", - "etl_service": "LLAMACLOUD", - "processing_stage": "parsing", - "estimated_pages": estimated_pages, - }, - ) - - raw_result = await parse_with_llamacloud_retry( - file_path=ctx.file_path, - estimated_pages=estimated_pages, - task_logger=ctx.task_logger, - log_entry=ctx.log_entry, - ) - - with contextlib.suppress(Exception): - os.unlink(ctx.file_path) - - markdown_documents = await raw_result.aget_markdown_documents(split_by_page=False) - - await _notify(ctx, "chunking", chunks_count=len(markdown_documents)) - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"LlamaCloud parsing completed, creating documents: {ctx.filename}", - { - "processing_stage": "parsing_complete", - "documents_count": len(markdown_documents), - }, - ) - - if not markdown_documents: - await ctx.task_logger.log_task_failure( - ctx.log_entry, - f"LlamaCloud parsing returned no documents: {ctx.filename}", - "ETL service returned empty document list", - {"error_type": "EmptyDocumentList", "etl_service": "LLAMACLOUD"}, - ) - raise ValueError(f"LlamaCloud parsing returned no documents for {ctx.filename}") - - actual_pages = page_limit_service.estimate_pages_from_markdown(markdown_documents) - final_pages = max(estimated_pages, actual_pages) - await _log_page_divergence( - ctx.task_logger, - ctx.log_entry, - ctx.filename, - estimated_pages, - actual_pages, - final_pages, - ) - - any_created = False - last_doc: Document | None = None - - for doc in markdown_documents: - doc_result = await add_received_file_document_using_llamacloud( - ctx.session, - ctx.filename, - llamacloud_markdown_document=doc.text, - search_space_id=ctx.search_space_id, - user_id=ctx.user_id, - connector=ctx.connector, - enable_summary=ctx.enable_summary, - ) - if doc_result: - any_created = True - last_doc = doc_result - - if any_created: - await page_limit_service.update_page_usage( - ctx.user_id, final_pages, allow_exceed=True - ) - if ctx.connector: - await update_document_from_connector(last_doc, ctx.connector, ctx.session) - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Successfully processed file with LlamaCloud: {ctx.filename}", - { - "document_id": last_doc.id, - "content_hash": last_doc.content_hash, - "file_type": "document", - "etl_service": "LLAMACLOUD", - "pages_processed": final_pages, - "documents_count": len(markdown_documents), - }, - ) - return last_doc - - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Document already exists (duplicate): {ctx.filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "LLAMACLOUD", - "documents_count": len(markdown_documents), - }, - ) - return None - - -async def _etl_docling( - ctx: _ProcessingContext, - page_limit_service, - estimated_pages: int, -) -> Document | None: - """Parse and save via the Docling ETL service.""" - await _notify(ctx, "parsing", "Extracting content") - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Processing file with Docling ETL: {ctx.filename}", - { - "file_type": "document", - "etl_service": "DOCLING", - "processing_stage": "parsing", - }, - ) - - content = await parse_with_docling(ctx.file_path, ctx.filename) - - with contextlib.suppress(Exception): - os.unlink(ctx.file_path) - - await ctx.task_logger.log_task_progress( - ctx.log_entry, - f"Docling parsing completed, creating document: {ctx.filename}", - {"processing_stage": "parsing_complete", "content_length": len(content)}, - ) - - actual_pages = page_limit_service.estimate_pages_from_content_length(len(content)) - final_pages = max(estimated_pages, actual_pages) - await _log_page_divergence( - ctx.task_logger, - ctx.log_entry, - ctx.filename, - estimated_pages, - actual_pages, - final_pages, - ) - - await _notify(ctx, "chunking") - - result = await add_received_file_document_using_docling( - ctx.session, - ctx.filename, - docling_markdown_document=content, - search_space_id=ctx.search_space_id, - user_id=ctx.user_id, - connector=ctx.connector, - enable_summary=ctx.enable_summary, - ) - - if result: - await page_limit_service.update_page_usage( - ctx.user_id, final_pages, allow_exceed=True - ) - if ctx.connector: - await update_document_from_connector(result, ctx.connector, ctx.session) - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Successfully processed file with Docling: {ctx.filename}", - { - "document_id": result.id, - "content_hash": result.content_hash, - "file_type": "document", - "etl_service": "DOCLING", - "pages_processed": final_pages, - }, - ) - else: - await ctx.task_logger.log_task_success( - ctx.log_entry, - f"Document already exists (duplicate): {ctx.filename}", - { - "duplicate_detected": True, - "file_type": "document", - "etl_service": "DOCLING", - }, - ) - return result - - async def _process_document_upload(ctx: _ProcessingContext) -> Document | None: - """Route a document file to the configured ETL service.""" + """Route a document file to the configured ETL service via the unified pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService from app.services.page_limit_service import PageLimitExceededError, PageLimitService page_limit_service = PageLimitService(ctx.session) @@ -665,16 +207,60 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None: os.unlink(ctx.file_path) raise HTTPException(status_code=403, detail=str(e)) from e - etl_dispatch = { - "UNSTRUCTURED": _etl_unstructured, - "LLAMACLOUD": _etl_llamacloud, - "DOCLING": _etl_docling, - } - handler = etl_dispatch.get(app_config.ETL_SERVICE) - if handler is None: - raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + await _notify(ctx, "parsing", "Extracting content") - return await handler(ctx, page_limit_service, estimated_pages) + etl_result = await EtlPipelineService().extract( + EtlRequest( + file_path=ctx.file_path, + filename=ctx.filename, + estimated_pages=estimated_pages, + ) + ) + + with contextlib.suppress(Exception): + os.unlink(ctx.file_path) + + await _notify(ctx, "chunking") + + result = await save_file_document( + ctx.session, + ctx.filename, + etl_result.markdown_content, + ctx.search_space_id, + ctx.user_id, + etl_result.etl_service, + ctx.connector, + enable_summary=ctx.enable_summary, + ) + + if result: + await page_limit_service.update_page_usage( + ctx.user_id, estimated_pages, allow_exceed=True + ) + if ctx.connector: + await update_document_from_connector(result, ctx.connector, ctx.session) + await ctx.task_logger.log_task_success( + ctx.log_entry, + f"Successfully processed file: {ctx.filename}", + { + "document_id": result.id, + "content_hash": result.content_hash, + "file_type": "document", + "etl_service": etl_result.etl_service, + "pages_processed": estimated_pages, + }, + ) + else: + await ctx.task_logger.log_task_success( + ctx.log_entry, + f"Document already exists (duplicate): {ctx.filename}", + { + "duplicate_detected": True, + "file_type": "document", + "etl_service": etl_result.etl_service, + }, + ) + return result # =================================================================== @@ -706,15 +292,14 @@ async def process_file_in_background( ) try: - category = classify_file(filename) + from app.etl_pipeline.file_classifier import FileCategory as EtlFileCategory + from app.etl_pipeline.file_classifier import classify_file as etl_classify - if category == FileCategory.MARKDOWN: - return await _process_markdown_upload(ctx) - if category == FileCategory.DIRECT_CONVERT: - return await _process_direct_convert_upload(ctx) - if category == FileCategory.AUDIO: - return await _process_audio_upload(ctx) - return await _process_document_upload(ctx) + category = etl_classify(filename) + + if category == EtlFileCategory.DOCUMENT: + return await _process_document_upload(ctx) + return await _process_non_document_upload(ctx) except Exception as e: await session.rollback() @@ -758,201 +343,61 @@ async def _extract_file_content( Returns: Tuple of (markdown_content, etl_service_name). """ - category = classify_file(filename) + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService + from app.etl_pipeline.file_classifier import FileCategory + from app.etl_pipeline.file_classifier import classify_file as etl_classify - if category == FileCategory.MARKDOWN: - if notification: - await NotificationService.document_processing.notify_processing_progress( - session, - notification, - stage="parsing", - stage_message="Reading file", - ) - await task_logger.log_task_progress( - log_entry, - f"Processing markdown/text file: {filename}", - {"file_type": "markdown", "processing_stage": "reading_file"}, - ) - with open(file_path, encoding="utf-8") as f: - content = f.read() - with contextlib.suppress(Exception): - os.unlink(file_path) - return content, "MARKDOWN" - - if category == FileCategory.DIRECT_CONVERT: - if notification: - await NotificationService.document_processing.notify_processing_progress( - session, - notification, - stage="parsing", - stage_message="Converting file", - ) - await task_logger.log_task_progress( - log_entry, - f"Direct-converting file to markdown: {filename}", - {"file_type": "direct_convert", "processing_stage": "converting"}, - ) - content = convert_file_directly(file_path, filename) - with contextlib.suppress(Exception): - os.unlink(file_path) - return content, "DIRECT_CONVERT" - - if category == FileCategory.AUDIO: - if notification: - await NotificationService.document_processing.notify_processing_progress( - session, - notification, - stage="parsing", - stage_message="Transcribing audio", - ) - await task_logger.log_task_progress( - log_entry, - f"Processing audio file for transcription: {filename}", - {"file_type": "audio", "processing_stage": "starting_transcription"}, - ) - transcribed_text = await _transcribe_audio(file_path, filename) - with contextlib.suppress(Exception): - os.unlink(file_path) - return transcribed_text, "AUDIO_TRANSCRIPTION" - - # Document file — use ETL service - return await _extract_document_content( - file_path, - filename, - session, - user_id, - task_logger, - log_entry, - notification, - ) - - -async def _transcribe_audio(file_path: str, filename: str) -> str: - """Transcribe an audio file and return formatted markdown text.""" - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - - if stt_service_type == "local": - from app.services.stt_service import stt_service - - result = stt_service.transcribe_file(file_path) - text = result.get("text", "") - if not text: - raise ValueError("Transcription returned empty text") - else: - from litellm import atranscription - - with open(file_path, "rb") as audio_file: - kwargs: dict = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - response = await atranscription(**kwargs) - text = response.get("text", "") - if not text: - raise ValueError("Transcription returned empty text") - - return f"# Transcription of {filename}\n\n{text}" - - -async def _extract_document_content( - file_path: str, - filename: str, - session: AsyncSession, - user_id: str, - task_logger: TaskLoggingService, - log_entry: Log, - notification: Notification | None, -) -> tuple[str, str]: - """ - Parse a document file via the configured ETL service. - - Returns: - Tuple of (markdown_content, etl_service_name). - """ - from app.services.page_limit_service import PageLimitService - - page_limit_service = PageLimitService(session) - - try: - estimated_pages = page_limit_service.estimate_pages_before_processing(file_path) - except Exception: - file_size = os.path.getsize(file_path) - estimated_pages = max(1, file_size // (80 * 1024)) - - await page_limit_service.check_page_limit(user_id, estimated_pages) - - etl_service = app_config.ETL_SERVICE - markdown_content: str | None = None + category = etl_classify(filename) + estimated_pages = 0 if notification: + stage_messages = { + FileCategory.PLAINTEXT: "Reading file", + FileCategory.DIRECT_CONVERT: "Converting file", + FileCategory.AUDIO: "Transcribing audio", + FileCategory.DOCUMENT: "Extracting content", + } await NotificationService.document_processing.notify_processing_progress( session, notification, stage="parsing", - stage_message="Extracting content", + stage_message=stage_messages.get(category, "Processing"), ) - if etl_service == "UNSTRUCTURED": - from app.utils.document_converters import convert_document_to_markdown + await task_logger.log_task_progress( + log_entry, + f"Processing {category.value} file: {filename}", + {"file_type": category.value, "processing_stage": "extracting"}, + ) - docs = await parse_with_unstructured(file_path) - markdown_content = await convert_document_to_markdown(docs) - actual_pages = page_limit_service.estimate_pages_from_elements(docs) - final_pages = max(estimated_pages, actual_pages) - await page_limit_service.update_page_usage( - user_id, final_pages, allow_exceed=True - ) + if category == FileCategory.DOCUMENT: + from app.services.page_limit_service import PageLimitService - elif etl_service == "LLAMACLOUD": - raw_result = await parse_with_llamacloud_retry( + page_limit_service = PageLimitService(session) + estimated_pages = _estimate_pages_safe(page_limit_service, file_path) + await page_limit_service.check_page_limit(user_id, estimated_pages) + + result = await EtlPipelineService().extract( + EtlRequest( file_path=file_path, + filename=filename, estimated_pages=estimated_pages, - task_logger=task_logger, - log_entry=log_entry, ) - markdown_documents = await raw_result.aget_markdown_documents( - split_by_page=False - ) - if not markdown_documents: - raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}") - markdown_content = markdown_documents[0].text + ) + + if category == FileCategory.DOCUMENT: await page_limit_service.update_page_usage( user_id, estimated_pages, allow_exceed=True ) - elif etl_service == "DOCLING": - getLogger("docling.pipeline.base_pipeline").setLevel(ERROR) - getLogger("docling.document_converter").setLevel(ERROR) - getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel( - ERROR - ) - - from docling.document_converter import DocumentConverter - - converter = DocumentConverter() - result = converter.convert(file_path) - markdown_content = result.document.export_to_markdown() - await page_limit_service.update_page_usage( - user_id, estimated_pages, allow_exceed=True - ) - - else: - raise RuntimeError(f"Unknown ETL_SERVICE: {etl_service}") - with contextlib.suppress(Exception): os.unlink(file_path) - if not markdown_content: + if not result.markdown_content: raise RuntimeError(f"Failed to extract content from file: {filename}") - return markdown_content, etl_service + return result.markdown_content, result.etl_service async def process_file_in_background_with_document( From f8913adaa30eadd5407c8286c726bda783fe44a9 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:46:04 +0530 Subject: [PATCH 09/37] test: add unit tests for content extraction from cloud connectors and ETL pipeline functionality --- .../test_content_extraction.py | 244 ++++++++++++++ .../tests/unit/etl_pipeline/conftest.py | 29 ++ .../etl_pipeline/test_etl_pipeline_service.py | 309 ++++++++++++++++++ 3 files changed, 582 insertions(+) create mode 100644 surfsense_backend/tests/unit/connector_indexers/test_content_extraction.py create mode 100644 surfsense_backend/tests/unit/etl_pipeline/conftest.py create mode 100644 surfsense_backend/tests/unit/etl_pipeline/test_etl_pipeline_service.py diff --git a/surfsense_backend/tests/unit/connector_indexers/test_content_extraction.py b/surfsense_backend/tests/unit/connector_indexers/test_content_extraction.py new file mode 100644 index 000000000..49f9a217a --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_content_extraction.py @@ -0,0 +1,244 @@ +"""Tests that each cloud connector's download_and_extract_content correctly +produces markdown from a real file via the unified ETL pipeline. + +Only the cloud client is mocked (system boundary). The ETL pipeline runs for +real so we know the full path from "cloud gives us bytes" to "we get markdown +back" actually works. +""" + +import os +from unittest.mock import AsyncMock, MagicMock + +import pytest + +pytestmark = pytest.mark.unit + +_TXT_CONTENT = "Hello from the cloud connector test." +_CSV_CONTENT = "name,age\nAlice,30\nBob,25\n" + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +async def _write_file(dest_path: str, content: str) -> None: + """Simulate a cloud client writing downloaded bytes to disk.""" + with open(dest_path, "w", encoding="utf-8") as f: + f.write(content) + + +def _make_download_side_effect(content: str): + """Return an async side-effect that writes *content* to the dest path + and returns ``None`` (success).""" + + async def _side_effect(*args): + dest_path = args[-1] + await _write_file(dest_path, content) + return None + + return _side_effect + + +# =================================================================== +# Google Drive +# =================================================================== + +class TestGoogleDriveContentExtraction: + + async def test_txt_file_returns_markdown(self): + from app.connectors.google_drive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_TXT_CONTENT), + ) + + file = {"id": "f1", "name": "notes.txt", "mimeType": "text/plain"} + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert _TXT_CONTENT in markdown + assert metadata["google_drive_file_id"] == "f1" + assert metadata["google_drive_file_name"] == "notes.txt" + + async def test_csv_file_returns_markdown_table(self): + from app.connectors.google_drive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_CSV_CONTENT), + ) + + file = {"id": "f2", "name": "data.csv", "mimeType": "text/csv"} + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert "Alice" in markdown + assert "Bob" in markdown + assert "|" in markdown + + async def test_download_error_returns_error_message(self): + from app.connectors.google_drive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock(return_value="Network timeout") + + file = {"id": "f3", "name": "doc.txt", "mimeType": "text/plain"} + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert markdown is None + assert error == "Network timeout" + + +# =================================================================== +# OneDrive +# =================================================================== + +class TestOneDriveContentExtraction: + + async def test_txt_file_returns_markdown(self): + from app.connectors.onedrive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_TXT_CONTENT), + ) + + file = { + "id": "od-1", + "name": "report.txt", + "file": {"mimeType": "text/plain"}, + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert _TXT_CONTENT in markdown + assert metadata["onedrive_file_id"] == "od-1" + assert metadata["onedrive_file_name"] == "report.txt" + + async def test_csv_file_returns_markdown_table(self): + from app.connectors.onedrive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_CSV_CONTENT), + ) + + file = { + "id": "od-2", + "name": "data.csv", + "file": {"mimeType": "text/csv"}, + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert "Alice" in markdown + assert "|" in markdown + + async def test_download_error_returns_error_message(self): + from app.connectors.onedrive.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock(return_value="403 Forbidden") + + file = { + "id": "od-3", + "name": "secret.txt", + "file": {"mimeType": "text/plain"}, + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert markdown is None + assert error == "403 Forbidden" + + +# =================================================================== +# Dropbox +# =================================================================== + +class TestDropboxContentExtraction: + + async def test_txt_file_returns_markdown(self): + from app.connectors.dropbox.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_TXT_CONTENT), + ) + + file = { + "id": "dbx-1", + "name": "memo.txt", + ".tag": "file", + "path_lower": "/memo.txt", + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert _TXT_CONTENT in markdown + assert metadata["dropbox_file_id"] == "dbx-1" + assert metadata["dropbox_file_name"] == "memo.txt" + + async def test_csv_file_returns_markdown_table(self): + from app.connectors.dropbox.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock( + side_effect=_make_download_side_effect(_CSV_CONTENT), + ) + + file = { + "id": "dbx-2", + "name": "data.csv", + ".tag": "file", + "path_lower": "/data.csv", + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert error is None + assert "Alice" in markdown + assert "|" in markdown + + async def test_download_error_returns_error_message(self): + from app.connectors.dropbox.content_extractor import ( + download_and_extract_content, + ) + + client = MagicMock() + client.download_file_to_disk = AsyncMock(return_value="Rate limited") + + file = { + "id": "dbx-3", + "name": "big.txt", + ".tag": "file", + "path_lower": "/big.txt", + } + + markdown, metadata, error = await download_and_extract_content(client, file) + + assert markdown is None + assert error == "Rate limited" diff --git a/surfsense_backend/tests/unit/etl_pipeline/conftest.py b/surfsense_backend/tests/unit/etl_pipeline/conftest.py new file mode 100644 index 000000000..6059caa01 --- /dev/null +++ b/surfsense_backend/tests/unit/etl_pipeline/conftest.py @@ -0,0 +1,29 @@ +"""Pre-register the etl_pipeline package to avoid circular imports during unit tests.""" + +import sys +import types +from pathlib import Path + +_BACKEND = Path(__file__).resolve().parents[3] + + +def _stub_package(dotted: str, fs_dir: Path) -> None: + if dotted not in sys.modules: + mod = types.ModuleType(dotted) + mod.__path__ = [str(fs_dir)] + mod.__package__ = dotted + sys.modules[dotted] = mod + + parts = dotted.split(".") + if len(parts) > 1: + parent_dotted = ".".join(parts[:-1]) + parent = sys.modules.get(parent_dotted) + if parent is not None: + setattr(parent, parts[-1], sys.modules[dotted]) + + +_stub_package("app", _BACKEND / "app") +_stub_package("app.etl_pipeline", _BACKEND / "app" / "etl_pipeline") +_stub_package( + "app.etl_pipeline.parsers", _BACKEND / "app" / "etl_pipeline" / "parsers" +) diff --git a/surfsense_backend/tests/unit/etl_pipeline/test_etl_pipeline_service.py b/surfsense_backend/tests/unit/etl_pipeline/test_etl_pipeline_service.py new file mode 100644 index 000000000..0d31507ca --- /dev/null +++ b/surfsense_backend/tests/unit/etl_pipeline/test_etl_pipeline_service.py @@ -0,0 +1,309 @@ +"""Tests for EtlPipelineService -- the unified ETL pipeline public interface.""" + +import pytest + +from app.etl_pipeline.etl_document import EtlRequest +from app.etl_pipeline.etl_pipeline_service import EtlPipelineService + +pytestmark = pytest.mark.unit + + +async def test_extract_txt_file_returns_markdown(tmp_path): + """Tracer bullet: a .txt file is read and returned as-is in an EtlResult.""" + txt_file = tmp_path / "hello.txt" + txt_file.write_text("Hello, world!", encoding="utf-8") + + service = EtlPipelineService() + result = await service.extract( + EtlRequest(file_path=str(txt_file), filename="hello.txt") + ) + + assert result.markdown_content == "Hello, world!" + assert result.etl_service == "PLAINTEXT" + assert result.content_type == "plaintext" + + +async def test_extract_md_file(tmp_path): + """A .md file is classified as PLAINTEXT and extracted.""" + md_file = tmp_path / "readme.md" + md_file.write_text("# Title\n\nBody text.", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(md_file), filename="readme.md") + ) + + assert result.markdown_content == "# Title\n\nBody text." + assert result.etl_service == "PLAINTEXT" + assert result.content_type == "plaintext" + + +async def test_extract_markdown_file(tmp_path): + """A .markdown file is classified as PLAINTEXT and extracted.""" + md_file = tmp_path / "notes.markdown" + md_file.write_text("Some notes.", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(md_file), filename="notes.markdown") + ) + + assert result.markdown_content == "Some notes." + assert result.etl_service == "PLAINTEXT" + + +async def test_extract_python_file(tmp_path): + """A .py source code file is classified as PLAINTEXT.""" + py_file = tmp_path / "script.py" + py_file.write_text("print('hello')", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(py_file), filename="script.py") + ) + + assert result.markdown_content == "print('hello')" + assert result.etl_service == "PLAINTEXT" + assert result.content_type == "plaintext" + + +async def test_extract_js_file(tmp_path): + """A .js source code file is classified as PLAINTEXT.""" + js_file = tmp_path / "app.js" + js_file.write_text("console.log('hi');", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(js_file), filename="app.js") + ) + + assert result.markdown_content == "console.log('hi');" + assert result.etl_service == "PLAINTEXT" + + +async def test_extract_csv_returns_markdown_table(tmp_path): + """A .csv file is converted to a markdown table.""" + csv_file = tmp_path / "data.csv" + csv_file.write_text("name,age\nAlice,30\nBob,25\n", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(csv_file), filename="data.csv") + ) + + assert "| name | age |" in result.markdown_content + assert "| Alice | 30 |" in result.markdown_content + assert result.etl_service == "DIRECT_CONVERT" + assert result.content_type == "direct_convert" + + +async def test_extract_tsv_returns_markdown_table(tmp_path): + """A .tsv file is converted to a markdown table.""" + tsv_file = tmp_path / "data.tsv" + tsv_file.write_text("x\ty\n1\t2\n", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(tsv_file), filename="data.tsv") + ) + + assert "| x | y |" in result.markdown_content + assert result.etl_service == "DIRECT_CONVERT" + + +async def test_extract_html_returns_markdown(tmp_path): + """An .html file is converted to markdown.""" + html_file = tmp_path / "page.html" + html_file.write_text("
Body
", encoding="utf-8") + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(html_file), filename="page.html") + ) + + assert "Title" in result.markdown_content + assert "Body" in result.markdown_content + assert result.etl_service == "DIRECT_CONVERT" + + +async def test_extract_mp3_returns_transcription(tmp_path, mocker): + """An .mp3 audio file is transcribed via litellm.atranscription.""" + audio_file = tmp_path / "recording.mp3" + audio_file.write_bytes(b"\x00" * 100) + + mocker.patch("app.config.config.STT_SERVICE", "openai/whisper-1") + mocker.patch("app.config.config.STT_SERVICE_API_KEY", "fake-key") + mocker.patch("app.config.config.STT_SERVICE_API_BASE", None) + + mock_transcription = mocker.patch( + "app.etl_pipeline.parsers.audio.atranscription", + return_value={"text": "Hello from audio"}, + ) + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(audio_file), filename="recording.mp3") + ) + + assert "Hello from audio" in result.markdown_content + assert result.etl_service == "AUDIO" + assert result.content_type == "audio" + mock_transcription.assert_called_once() + + +# --------------------------------------------------------------------------- +# Slice 7 – DOCLING document parsing +# --------------------------------------------------------------------------- + + +async def test_extract_pdf_with_docling(tmp_path, mocker): + """A .pdf file with ETL_SERVICE=DOCLING returns parsed markdown.""" + pdf_file = tmp_path / "report.pdf" + pdf_file.write_bytes(b"%PDF-1.4 fake") + + mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") + + fake_docling = mocker.AsyncMock() + fake_docling.process_document.return_value = {"content": "# Parsed PDF"} + mocker.patch( + "app.services.docling_service.create_docling_service", + return_value=fake_docling, + ) + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(pdf_file), filename="report.pdf") + ) + + assert result.markdown_content == "# Parsed PDF" + assert result.etl_service == "DOCLING" + assert result.content_type == "document" + + +# --------------------------------------------------------------------------- +# Slice 8 – UNSTRUCTURED document parsing +# --------------------------------------------------------------------------- + + +async def test_extract_pdf_with_unstructured(tmp_path, mocker): + """A .pdf file with ETL_SERVICE=UNSTRUCTURED returns parsed markdown.""" + pdf_file = tmp_path / "report.pdf" + pdf_file.write_bytes(b"%PDF-1.4 fake") + + mocker.patch("app.config.config.ETL_SERVICE", "UNSTRUCTURED") + + class FakeDoc: + def __init__(self, text): + self.page_content = text + + fake_loader_instance = mocker.AsyncMock() + fake_loader_instance.aload.return_value = [ + FakeDoc("Page 1 content"), + FakeDoc("Page 2 content"), + ] + mocker.patch( + "langchain_unstructured.UnstructuredLoader", + return_value=fake_loader_instance, + ) + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(pdf_file), filename="report.pdf") + ) + + assert "Page 1 content" in result.markdown_content + assert "Page 2 content" in result.markdown_content + assert result.etl_service == "UNSTRUCTURED" + assert result.content_type == "document" + + +# --------------------------------------------------------------------------- +# Slice 9 – LLAMACLOUD document parsing +# --------------------------------------------------------------------------- + + +async def test_extract_pdf_with_llamacloud(tmp_path, mocker): + """A .pdf file with ETL_SERVICE=LLAMACLOUD returns parsed markdown.""" + pdf_file = tmp_path / "report.pdf" + pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10) + + mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD") + mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True) + + class FakeDoc: + text = "# LlamaCloud parsed" + + class FakeJobResult: + pages = [] + + def get_markdown_documents(self, split_by_page=True): + return [FakeDoc()] + + fake_parser = mocker.AsyncMock() + fake_parser.aparse.return_value = FakeJobResult() + mocker.patch( + "llama_cloud_services.LlamaParse", + return_value=fake_parser, + ) + mocker.patch( + "llama_cloud_services.parse.utils.ResultType", + mocker.MagicMock(MD="md"), + ) + + result = await EtlPipelineService().extract( + EtlRequest( + file_path=str(pdf_file), filename="report.pdf", estimated_pages=5 + ) + ) + + assert result.markdown_content == "# LlamaCloud parsed" + assert result.etl_service == "LLAMACLOUD" + assert result.content_type == "document" + + +# --------------------------------------------------------------------------- +# Slice 10 – unknown extension falls through to document ETL +# --------------------------------------------------------------------------- + + +async def test_unknown_extension_uses_document_etl(tmp_path, mocker): + """An unknown extension (e.g. .docx) falls through to the document ETL path.""" + docx_file = tmp_path / "doc.docx" + docx_file.write_bytes(b"PK fake docx") + + mocker.patch("app.config.config.ETL_SERVICE", "DOCLING") + + fake_docling = mocker.AsyncMock() + fake_docling.process_document.return_value = {"content": "Docx content"} + mocker.patch( + "app.services.docling_service.create_docling_service", + return_value=fake_docling, + ) + + result = await EtlPipelineService().extract( + EtlRequest(file_path=str(docx_file), filename="doc.docx") + ) + + assert result.markdown_content == "Docx content" + assert result.content_type == "document" + + +# --------------------------------------------------------------------------- +# Slice 11 – EtlRequest validation +# --------------------------------------------------------------------------- + + +def test_etl_request_requires_filename(): + """EtlRequest rejects missing filename.""" + with pytest.raises(Exception): + EtlRequest(file_path="/tmp/some.txt", filename="") + + +# --------------------------------------------------------------------------- +# Slice 12 – unknown ETL_SERVICE raises EtlServiceUnavailableError +# --------------------------------------------------------------------------- + + +async def test_unknown_etl_service_raises(tmp_path, mocker): + """An unknown ETL_SERVICE raises EtlServiceUnavailableError.""" + from app.etl_pipeline.exceptions import EtlServiceUnavailableError + + pdf_file = tmp_path / "report.pdf" + pdf_file.write_bytes(b"%PDF fake") + + mocker.patch("app.config.config.ETL_SERVICE", "NONEXISTENT") + + with pytest.raises(EtlServiceUnavailableError, match="Unknown ETL_SERVICE"): + await EtlPipelineService().extract( + EtlRequest(file_path=str(pdf_file), filename="report.pdf") + ) From c6e94188eb83b8b7989b393ea1fe23756273faf2 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 18:23:32 +0530 Subject: [PATCH 10/37] refactor: remove destructive text classes from DocumentNode and enhance CreateSearchSpaceDialog with select-none and select-text classes --- surfsense_web/components/documents/DocumentNode.tsx | 2 -- .../layout/ui/dialogs/CreateSearchSpaceDialog.tsx | 6 +++--- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/surfsense_web/components/documents/DocumentNode.tsx b/surfsense_web/components/documents/DocumentNode.tsx index 919f904d4..33ce2bf26 100644 --- a/surfsense_web/components/documents/DocumentNode.tsx +++ b/surfsense_web/components/documents/DocumentNode.tsx @@ -260,7 +260,6 @@ export const DocumentNode = React.memo(function DocumentNode({ )}