test: add unit tests for content extraction from cloud connectors and ETL pipeline functionality

This commit is contained in:
Anish Sarkar 2026-04-05 17:46:04 +05:30
parent 87af012a60
commit f8913adaa3
3 changed files with 582 additions and 0 deletions

View file

@ -0,0 +1,244 @@
"""Tests that each cloud connector's download_and_extract_content correctly
produces markdown from a real file via the unified ETL pipeline.
Only the cloud client is mocked (system boundary). The ETL pipeline runs for
real so we know the full path from "cloud gives us bytes" to "we get markdown
back" actually works.
"""
import os
from unittest.mock import AsyncMock, MagicMock
import pytest
pytestmark = pytest.mark.unit
_TXT_CONTENT = "Hello from the cloud connector test."
_CSV_CONTENT = "name,age\nAlice,30\nBob,25\n"
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
async def _write_file(dest_path: str, content: str) -> None:
"""Simulate a cloud client writing downloaded bytes to disk."""
with open(dest_path, "w", encoding="utf-8") as f:
f.write(content)
def _make_download_side_effect(content: str):
"""Return an async side-effect that writes *content* to the dest path
and returns ``None`` (success)."""
async def _side_effect(*args):
dest_path = args[-1]
await _write_file(dest_path, content)
return None
return _side_effect
# ===================================================================
# Google Drive
# ===================================================================
class TestGoogleDriveContentExtraction:
async def test_txt_file_returns_markdown(self):
from app.connectors.google_drive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_TXT_CONTENT),
)
file = {"id": "f1", "name": "notes.txt", "mimeType": "text/plain"}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert _TXT_CONTENT in markdown
assert metadata["google_drive_file_id"] == "f1"
assert metadata["google_drive_file_name"] == "notes.txt"
async def test_csv_file_returns_markdown_table(self):
from app.connectors.google_drive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_CSV_CONTENT),
)
file = {"id": "f2", "name": "data.csv", "mimeType": "text/csv"}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert "Alice" in markdown
assert "Bob" in markdown
assert "|" in markdown
async def test_download_error_returns_error_message(self):
from app.connectors.google_drive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(return_value="Network timeout")
file = {"id": "f3", "name": "doc.txt", "mimeType": "text/plain"}
markdown, metadata, error = await download_and_extract_content(client, file)
assert markdown is None
assert error == "Network timeout"
# ===================================================================
# OneDrive
# ===================================================================
class TestOneDriveContentExtraction:
async def test_txt_file_returns_markdown(self):
from app.connectors.onedrive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_TXT_CONTENT),
)
file = {
"id": "od-1",
"name": "report.txt",
"file": {"mimeType": "text/plain"},
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert _TXT_CONTENT in markdown
assert metadata["onedrive_file_id"] == "od-1"
assert metadata["onedrive_file_name"] == "report.txt"
async def test_csv_file_returns_markdown_table(self):
from app.connectors.onedrive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_CSV_CONTENT),
)
file = {
"id": "od-2",
"name": "data.csv",
"file": {"mimeType": "text/csv"},
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert "Alice" in markdown
assert "|" in markdown
async def test_download_error_returns_error_message(self):
from app.connectors.onedrive.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(return_value="403 Forbidden")
file = {
"id": "od-3",
"name": "secret.txt",
"file": {"mimeType": "text/plain"},
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert markdown is None
assert error == "403 Forbidden"
# ===================================================================
# Dropbox
# ===================================================================
class TestDropboxContentExtraction:
async def test_txt_file_returns_markdown(self):
from app.connectors.dropbox.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_TXT_CONTENT),
)
file = {
"id": "dbx-1",
"name": "memo.txt",
".tag": "file",
"path_lower": "/memo.txt",
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert _TXT_CONTENT in markdown
assert metadata["dropbox_file_id"] == "dbx-1"
assert metadata["dropbox_file_name"] == "memo.txt"
async def test_csv_file_returns_markdown_table(self):
from app.connectors.dropbox.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(
side_effect=_make_download_side_effect(_CSV_CONTENT),
)
file = {
"id": "dbx-2",
"name": "data.csv",
".tag": "file",
"path_lower": "/data.csv",
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert error is None
assert "Alice" in markdown
assert "|" in markdown
async def test_download_error_returns_error_message(self):
from app.connectors.dropbox.content_extractor import (
download_and_extract_content,
)
client = MagicMock()
client.download_file_to_disk = AsyncMock(return_value="Rate limited")
file = {
"id": "dbx-3",
"name": "big.txt",
".tag": "file",
"path_lower": "/big.txt",
}
markdown, metadata, error = await download_and_extract_content(client, file)
assert markdown is None
assert error == "Rate limited"

View file

@ -0,0 +1,29 @@
"""Pre-register the etl_pipeline package to avoid circular imports during unit tests."""
import sys
import types
from pathlib import Path
_BACKEND = Path(__file__).resolve().parents[3]
def _stub_package(dotted: str, fs_dir: Path) -> None:
if dotted not in sys.modules:
mod = types.ModuleType(dotted)
mod.__path__ = [str(fs_dir)]
mod.__package__ = dotted
sys.modules[dotted] = mod
parts = dotted.split(".")
if len(parts) > 1:
parent_dotted = ".".join(parts[:-1])
parent = sys.modules.get(parent_dotted)
if parent is not None:
setattr(parent, parts[-1], sys.modules[dotted])
_stub_package("app", _BACKEND / "app")
_stub_package("app.etl_pipeline", _BACKEND / "app" / "etl_pipeline")
_stub_package(
"app.etl_pipeline.parsers", _BACKEND / "app" / "etl_pipeline" / "parsers"
)

View file

@ -0,0 +1,309 @@
"""Tests for EtlPipelineService -- the unified ETL pipeline public interface."""
import pytest
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
pytestmark = pytest.mark.unit
async def test_extract_txt_file_returns_markdown(tmp_path):
"""Tracer bullet: a .txt file is read and returned as-is in an EtlResult."""
txt_file = tmp_path / "hello.txt"
txt_file.write_text("Hello, world!", encoding="utf-8")
service = EtlPipelineService()
result = await service.extract(
EtlRequest(file_path=str(txt_file), filename="hello.txt")
)
assert result.markdown_content == "Hello, world!"
assert result.etl_service == "PLAINTEXT"
assert result.content_type == "plaintext"
async def test_extract_md_file(tmp_path):
"""A .md file is classified as PLAINTEXT and extracted."""
md_file = tmp_path / "readme.md"
md_file.write_text("# Title\n\nBody text.", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(md_file), filename="readme.md")
)
assert result.markdown_content == "# Title\n\nBody text."
assert result.etl_service == "PLAINTEXT"
assert result.content_type == "plaintext"
async def test_extract_markdown_file(tmp_path):
"""A .markdown file is classified as PLAINTEXT and extracted."""
md_file = tmp_path / "notes.markdown"
md_file.write_text("Some notes.", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(md_file), filename="notes.markdown")
)
assert result.markdown_content == "Some notes."
assert result.etl_service == "PLAINTEXT"
async def test_extract_python_file(tmp_path):
"""A .py source code file is classified as PLAINTEXT."""
py_file = tmp_path / "script.py"
py_file.write_text("print('hello')", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(py_file), filename="script.py")
)
assert result.markdown_content == "print('hello')"
assert result.etl_service == "PLAINTEXT"
assert result.content_type == "plaintext"
async def test_extract_js_file(tmp_path):
"""A .js source code file is classified as PLAINTEXT."""
js_file = tmp_path / "app.js"
js_file.write_text("console.log('hi');", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(js_file), filename="app.js")
)
assert result.markdown_content == "console.log('hi');"
assert result.etl_service == "PLAINTEXT"
async def test_extract_csv_returns_markdown_table(tmp_path):
"""A .csv file is converted to a markdown table."""
csv_file = tmp_path / "data.csv"
csv_file.write_text("name,age\nAlice,30\nBob,25\n", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(csv_file), filename="data.csv")
)
assert "| name | age |" in result.markdown_content
assert "| Alice | 30 |" in result.markdown_content
assert result.etl_service == "DIRECT_CONVERT"
assert result.content_type == "direct_convert"
async def test_extract_tsv_returns_markdown_table(tmp_path):
"""A .tsv file is converted to a markdown table."""
tsv_file = tmp_path / "data.tsv"
tsv_file.write_text("x\ty\n1\t2\n", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(tsv_file), filename="data.tsv")
)
assert "| x | y |" in result.markdown_content
assert result.etl_service == "DIRECT_CONVERT"
async def test_extract_html_returns_markdown(tmp_path):
"""An .html file is converted to markdown."""
html_file = tmp_path / "page.html"
html_file.write_text("<h1>Title</h1><p>Body</p>", encoding="utf-8")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(html_file), filename="page.html")
)
assert "Title" in result.markdown_content
assert "Body" in result.markdown_content
assert result.etl_service == "DIRECT_CONVERT"
async def test_extract_mp3_returns_transcription(tmp_path, mocker):
"""An .mp3 audio file is transcribed via litellm.atranscription."""
audio_file = tmp_path / "recording.mp3"
audio_file.write_bytes(b"\x00" * 100)
mocker.patch("app.config.config.STT_SERVICE", "openai/whisper-1")
mocker.patch("app.config.config.STT_SERVICE_API_KEY", "fake-key")
mocker.patch("app.config.config.STT_SERVICE_API_BASE", None)
mock_transcription = mocker.patch(
"app.etl_pipeline.parsers.audio.atranscription",
return_value={"text": "Hello from audio"},
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(audio_file), filename="recording.mp3")
)
assert "Hello from audio" in result.markdown_content
assert result.etl_service == "AUDIO"
assert result.content_type == "audio"
mock_transcription.assert_called_once()
# ---------------------------------------------------------------------------
# Slice 7 DOCLING document parsing
# ---------------------------------------------------------------------------
async def test_extract_pdf_with_docling(tmp_path, mocker):
"""A .pdf file with ETL_SERVICE=DOCLING returns parsed markdown."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake")
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
fake_docling = mocker.AsyncMock()
fake_docling.process_document.return_value = {"content": "# Parsed PDF"}
mocker.patch(
"app.services.docling_service.create_docling_service",
return_value=fake_docling,
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
)
assert result.markdown_content == "# Parsed PDF"
assert result.etl_service == "DOCLING"
assert result.content_type == "document"
# ---------------------------------------------------------------------------
# Slice 8 UNSTRUCTURED document parsing
# ---------------------------------------------------------------------------
async def test_extract_pdf_with_unstructured(tmp_path, mocker):
"""A .pdf file with ETL_SERVICE=UNSTRUCTURED returns parsed markdown."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake")
mocker.patch("app.config.config.ETL_SERVICE", "UNSTRUCTURED")
class FakeDoc:
def __init__(self, text):
self.page_content = text
fake_loader_instance = mocker.AsyncMock()
fake_loader_instance.aload.return_value = [
FakeDoc("Page 1 content"),
FakeDoc("Page 2 content"),
]
mocker.patch(
"langchain_unstructured.UnstructuredLoader",
return_value=fake_loader_instance,
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
)
assert "Page 1 content" in result.markdown_content
assert "Page 2 content" in result.markdown_content
assert result.etl_service == "UNSTRUCTURED"
assert result.content_type == "document"
# ---------------------------------------------------------------------------
# Slice 9 LLAMACLOUD document parsing
# ---------------------------------------------------------------------------
async def test_extract_pdf_with_llamacloud(tmp_path, mocker):
"""A .pdf file with ETL_SERVICE=LLAMACLOUD returns parsed markdown."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
class FakeDoc:
text = "# LlamaCloud parsed"
class FakeJobResult:
pages = []
def get_markdown_documents(self, split_by_page=True):
return [FakeDoc()]
fake_parser = mocker.AsyncMock()
fake_parser.aparse.return_value = FakeJobResult()
mocker.patch(
"llama_cloud_services.LlamaParse",
return_value=fake_parser,
)
mocker.patch(
"llama_cloud_services.parse.utils.ResultType",
mocker.MagicMock(MD="md"),
)
result = await EtlPipelineService().extract(
EtlRequest(
file_path=str(pdf_file), filename="report.pdf", estimated_pages=5
)
)
assert result.markdown_content == "# LlamaCloud parsed"
assert result.etl_service == "LLAMACLOUD"
assert result.content_type == "document"
# ---------------------------------------------------------------------------
# Slice 10 unknown extension falls through to document ETL
# ---------------------------------------------------------------------------
async def test_unknown_extension_uses_document_etl(tmp_path, mocker):
"""An unknown extension (e.g. .docx) falls through to the document ETL path."""
docx_file = tmp_path / "doc.docx"
docx_file.write_bytes(b"PK fake docx")
mocker.patch("app.config.config.ETL_SERVICE", "DOCLING")
fake_docling = mocker.AsyncMock()
fake_docling.process_document.return_value = {"content": "Docx content"}
mocker.patch(
"app.services.docling_service.create_docling_service",
return_value=fake_docling,
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(docx_file), filename="doc.docx")
)
assert result.markdown_content == "Docx content"
assert result.content_type == "document"
# ---------------------------------------------------------------------------
# Slice 11 EtlRequest validation
# ---------------------------------------------------------------------------
def test_etl_request_requires_filename():
"""EtlRequest rejects missing filename."""
with pytest.raises(Exception):
EtlRequest(file_path="/tmp/some.txt", filename="")
# ---------------------------------------------------------------------------
# Slice 12 unknown ETL_SERVICE raises EtlServiceUnavailableError
# ---------------------------------------------------------------------------
async def test_unknown_etl_service_raises(tmp_path, mocker):
"""An unknown ETL_SERVICE raises EtlServiceUnavailableError."""
from app.etl_pipeline.exceptions import EtlServiceUnavailableError
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF fake")
mocker.patch("app.config.config.ETL_SERVICE", "NONEXISTENT")
with pytest.raises(EtlServiceUnavailableError, match="Unknown ETL_SERVICE"):
await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
)