From 8224360afa532300ffcd3afb7f4ea2627b253e99 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 5 Apr 2026 17:30:29 +0530 Subject: [PATCH] refactor: unify file parsing logic across Dropbox, Google Drive, and OneDrive using the ETL pipeline --- .../connectors/dropbox/content_extractor.py | 8 +- .../google_drive/content_extractor.py | 102 ++-------------- .../connectors/onedrive/content_extractor.py | 110 ++---------------- 3 files changed, 21 insertions(+), 199 deletions(-) diff --git a/surfsense_backend/app/connectors/dropbox/content_extractor.py b/surfsense_backend/app/connectors/dropbox/content_extractor.py index e89893b14..8e947eee7 100644 --- a/surfsense_backend/app/connectors/dropbox/content_extractor.py +++ b/surfsense_backend/app/connectors/dropbox/content_extractor.py @@ -87,9 +87,13 @@ async def download_and_extract_content( if error: return None, metadata, error - from app.connectors.onedrive.content_extractor import _parse_file_to_markdown + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - markdown = await _parse_file_to_markdown(temp_file_path, file_name) + result = await EtlPipelineService().extract( + EtlRequest(file_path=temp_file_path, filename=file_name) + ) + markdown = result.markdown_content return markdown, metadata, None except Exception as e: diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 1e94133b4..0c559fee9 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -1,12 +1,9 @@ """Content extraction for Google Drive files.""" -import asyncio import contextlib import logging import os import tempfile -import threading -import time from pathlib import Path from typing import Any @@ -110,99 +107,14 @@ async def download_and_extract_content( async def _parse_file_to_markdown(file_path: str, filename: str) -> str: - """Parse a local file to markdown using the configured ETL service.""" - lower = filename.lower() + """Parse a local file to markdown using the unified ETL pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - if lower.endswith((".md", ".markdown", ".txt")): - with open(file_path, encoding="utf-8") as f: - return f.read() - - if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")): - from litellm import atranscription - - from app.config import config as app_config - - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - if stt_service_type == "local": - from app.services.stt_service import stt_service - - t0 = time.monotonic() - logger.info( - f"[local-stt] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(stt_service.transcribe_file, file_path) - logger.info( - f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - text = result.get("text", "") - else: - with open(file_path, "rb") as audio_file: - kwargs: dict[str, Any] = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - resp = await atranscription(**kwargs) - text = resp.get("text", "") - - if not text: - raise ValueError("Transcription returned empty text") - return f"# Transcription of {filename}\n\n{text}" - - # Document files -- use configured ETL service - from app.config import config as app_config - - if app_config.ETL_SERVICE == "UNSTRUCTURED": - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - docs = await loader.aload() - return await convert_document_to_markdown(docs) - - if app_config.ETL_SERVICE == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - result = await parse_with_llamacloud_retry( - file_path=file_path, estimated_pages=50 - ) - markdown_documents = await result.aget_markdown_documents(split_by_page=False) - if not markdown_documents: - raise RuntimeError(f"LlamaCloud returned no documents for {filename}") - return markdown_documents[0].text - - if app_config.ETL_SERVICE == "DOCLING": - from docling.document_converter import DocumentConverter - - converter = DocumentConverter() - t0 = time.monotonic() - logger.info( - f"[docling] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(converter.convert, file_path) - logger.info( - f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - return result.document.export_to_markdown() - - raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + result = await EtlPipelineService().extract( + EtlRequest(file_path=file_path, filename=filename) + ) + return result.markdown_content async def download_and_process_file( diff --git a/surfsense_backend/app/connectors/onedrive/content_extractor.py b/surfsense_backend/app/connectors/onedrive/content_extractor.py index 8917ba1fd..2355993eb 100644 --- a/surfsense_backend/app/connectors/onedrive/content_extractor.py +++ b/surfsense_backend/app/connectors/onedrive/content_extractor.py @@ -1,16 +1,9 @@ -"""Content extraction for OneDrive files. +"""Content extraction for OneDrive files.""" -Reuses the same ETL parsing logic as Google Drive since file parsing is -extension-based, not provider-specific. -""" - -import asyncio import contextlib import logging import os import tempfile -import threading -import time from pathlib import Path from typing import Any @@ -84,98 +77,11 @@ async def download_and_extract_content( async def _parse_file_to_markdown(file_path: str, filename: str) -> str: - """Parse a local file to markdown using the configured ETL service. + """Parse a local file to markdown using the unified ETL pipeline.""" + from app.etl_pipeline.etl_document import EtlRequest + from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - Same logic as Google Drive -- file parsing is extension-based. - """ - lower = filename.lower() - - if lower.endswith((".md", ".markdown", ".txt")): - with open(file_path, encoding="utf-8") as f: - return f.read() - - if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")): - from litellm import atranscription - - from app.config import config as app_config - - stt_service_type = ( - "local" - if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/") - else "external" - ) - if stt_service_type == "local": - from app.services.stt_service import stt_service - - t0 = time.monotonic() - logger.info( - f"[local-stt] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(stt_service.transcribe_file, file_path) - logger.info( - f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - text = result.get("text", "") - else: - with open(file_path, "rb") as audio_file: - kwargs: dict[str, Any] = { - "model": app_config.STT_SERVICE, - "file": audio_file, - "api_key": app_config.STT_SERVICE_API_KEY, - } - if app_config.STT_SERVICE_API_BASE: - kwargs["api_base"] = app_config.STT_SERVICE_API_BASE - resp = await atranscription(**kwargs) - text = resp.get("text", "") - - if not text: - raise ValueError("Transcription returned empty text") - return f"# Transcription of {filename}\n\n{text}" - - from app.config import config as app_config - - if app_config.ETL_SERVICE == "UNSTRUCTURED": - from langchain_unstructured import UnstructuredLoader - - from app.utils.document_converters import convert_document_to_markdown - - loader = UnstructuredLoader( - file_path, - mode="elements", - post_processors=[], - languages=["eng"], - include_orig_elements=False, - include_metadata=False, - strategy="auto", - ) - docs = await loader.aload() - return await convert_document_to_markdown(docs) - - if app_config.ETL_SERVICE == "LLAMACLOUD": - from app.tasks.document_processors.file_processors import ( - parse_with_llamacloud_retry, - ) - - result = await parse_with_llamacloud_retry( - file_path=file_path, estimated_pages=50 - ) - markdown_documents = await result.aget_markdown_documents(split_by_page=False) - if not markdown_documents: - raise RuntimeError(f"LlamaCloud returned no documents for {filename}") - return markdown_documents[0].text - - if app_config.ETL_SERVICE == "DOCLING": - from docling.document_converter import DocumentConverter - - converter = DocumentConverter() - t0 = time.monotonic() - logger.info( - f"[docling] START file={filename} thread={threading.current_thread().name}" - ) - result = await asyncio.to_thread(converter.convert, file_path) - logger.info( - f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s" - ) - return result.document.export_to_markdown() - - raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}") + result = await EtlPipelineService().extract( + EtlRequest(file_path=file_path, filename=filename) + ) + return result.markdown_content