refactor: unify file parsing logic across Dropbox, Google Drive, and OneDrive using the ETL pipeline

This commit is contained in:
Anish Sarkar 2026-04-05 17:30:29 +05:30
parent 1248363ca9
commit 8224360afa
3 changed files with 21 additions and 199 deletions

View file

@ -87,9 +87,13 @@ async def download_and_extract_content(
if error: if error:
return None, metadata, error return None, metadata, error
from app.connectors.onedrive.content_extractor import _parse_file_to_markdown from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
markdown = await _parse_file_to_markdown(temp_file_path, file_name) result = await EtlPipelineService().extract(
EtlRequest(file_path=temp_file_path, filename=file_name)
)
markdown = result.markdown_content
return markdown, metadata, None return markdown, metadata, None
except Exception as e: except Exception as e:

View file

@ -1,12 +1,9 @@
"""Content extraction for Google Drive files.""" """Content extraction for Google Drive files."""
import asyncio
import contextlib import contextlib
import logging import logging
import os import os
import tempfile import tempfile
import threading
import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -110,99 +107,14 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(file_path: str, filename: str) -> str: async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
"""Parse a local file to markdown using the configured ETL service.""" """Parse a local file to markdown using the unified ETL pipeline."""
lower = filename.lower() from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
if lower.endswith((".md", ".markdown", ".txt")): result = await EtlPipelineService().extract(
with open(file_path, encoding="utf-8") as f: EtlRequest(file_path=file_path, filename=filename)
return f.read() )
return result.markdown_content
if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
from litellm import atranscription
from app.config import config as app_config
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
t0 = time.monotonic()
logger.info(
f"[local-stt] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(stt_service.transcribe_file, file_path)
logger.info(
f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
text = result.get("text", "")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict[str, Any] = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
resp = await atranscription(**kwargs)
text = resp.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
# Document files -- use configured ETL service
from app.config import config as app_config
if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return await convert_document_to_markdown(docs)
if app_config.ETL_SERVICE == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=50
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
return markdown_documents[0].text
if app_config.ETL_SERVICE == "DOCLING":
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
t0 = time.monotonic()
logger.info(
f"[docling] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(converter.convert, file_path)
logger.info(
f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
return result.document.export_to_markdown()
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
async def download_and_process_file( async def download_and_process_file(

View file

@ -1,16 +1,9 @@
"""Content extraction for OneDrive files. """Content extraction for OneDrive files."""
Reuses the same ETL parsing logic as Google Drive since file parsing is
extension-based, not provider-specific.
"""
import asyncio
import contextlib import contextlib
import logging import logging
import os import os
import tempfile import tempfile
import threading
import time
from pathlib import Path from pathlib import Path
from typing import Any from typing import Any
@ -84,98 +77,11 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(file_path: str, filename: str) -> str: async def _parse_file_to_markdown(file_path: str, filename: str) -> str:
"""Parse a local file to markdown using the configured ETL service. """Parse a local file to markdown using the unified ETL pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
Same logic as Google Drive -- file parsing is extension-based. result = await EtlPipelineService().extract(
""" EtlRequest(file_path=file_path, filename=filename)
lower = filename.lower() )
return result.markdown_content
if lower.endswith((".md", ".markdown", ".txt")):
with open(file_path, encoding="utf-8") as f:
return f.read()
if lower.endswith((".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm")):
from litellm import atranscription
from app.config import config as app_config
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
t0 = time.monotonic()
logger.info(
f"[local-stt] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(stt_service.transcribe_file, file_path)
logger.info(
f"[local-stt] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
text = result.get("text", "")
else:
with open(file_path, "rb") as audio_file:
kwargs: dict[str, Any] = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
resp = await atranscription(**kwargs)
text = resp.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
from app.config import config as app_config
if app_config.ETL_SERVICE == "UNSTRUCTURED":
from langchain_unstructured import UnstructuredLoader
from app.utils.document_converters import convert_document_to_markdown
loader = UnstructuredLoader(
file_path,
mode="elements",
post_processors=[],
languages=["eng"],
include_orig_elements=False,
include_metadata=False,
strategy="auto",
)
docs = await loader.aload()
return await convert_document_to_markdown(docs)
if app_config.ETL_SERVICE == "LLAMACLOUD":
from app.tasks.document_processors.file_processors import (
parse_with_llamacloud_retry,
)
result = await parse_with_llamacloud_retry(
file_path=file_path, estimated_pages=50
)
markdown_documents = await result.aget_markdown_documents(split_by_page=False)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud returned no documents for {filename}")
return markdown_documents[0].text
if app_config.ETL_SERVICE == "DOCLING":
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
t0 = time.monotonic()
logger.info(
f"[docling] START file={filename} thread={threading.current_thread().name}"
)
result = await asyncio.to_thread(converter.convert, file_path)
logger.info(
f"[docling] END file={filename} elapsed={time.monotonic() - t0:.2f}s"
)
return result.document.export_to_markdown()
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")