feat: add integration and unit tests for OneDrive indexing pipeline and parallel downloads

This commit is contained in:
Anish Sarkar 2026-03-28 16:39:47 +05:30
parent 7004e764a9
commit 028c88be72
4 changed files with 330 additions and 3 deletions

View file

@ -0,0 +1,100 @@
"""Integration tests: OneDrive ConnectorDocuments flow through the pipeline."""
import pytest
from sqlalchemy import select
from app.config import config as app_config
from app.db import Document, DocumentStatus, DocumentType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
pytestmark = pytest.mark.integration
def _onedrive_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument:
return ConnectorDocument(
title=f"File {unique_id}.docx",
source_markdown=f"## Document\n\nContent from {unique_id}",
unique_id=unique_id,
document_type=DocumentType.ONEDRIVE_FILE,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=True,
fallback_summary=f"File: {unique_id}.docx",
metadata={
"onedrive_file_id": unique_id,
"onedrive_file_name": f"{unique_id}.docx",
"document_type": "OneDrive File",
},
)
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
async def test_onedrive_pipeline_creates_ready_document(
db_session, db_search_space, db_connector, db_user, mocker
):
"""A OneDrive ConnectorDocument flows through prepare + index to a READY document."""
space_id = db_search_space.id
doc = _onedrive_doc(
unique_id="od-file-abc",
search_space_id=space_id,
connector_id=db_connector.id,
user_id=str(db_user.id),
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([doc])
assert len(prepared) == 1
await service.index(prepared[0], doc, llm=mocker.Mock())
result = await db_session.execute(
select(Document).filter(Document.search_space_id == space_id)
)
row = result.scalars().first()
assert row is not None
assert row.document_type == DocumentType.ONEDRIVE_FILE
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
async def test_onedrive_duplicate_content_skipped(
db_session, db_search_space, db_connector, db_user, mocker
):
"""Re-indexing a OneDrive doc with the same content is skipped (content hash match)."""
space_id = db_search_space.id
user_id = str(db_user.id)
doc = _onedrive_doc(
unique_id="od-dup-file",
search_space_id=space_id,
connector_id=db_connector.id,
user_id=user_id,
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([doc])
assert len(prepared) == 1
await service.index(prepared[0], doc, llm=mocker.Mock())
result = await db_session.execute(
select(Document).filter(Document.search_space_id == space_id)
)
first_doc = result.scalars().first()
assert first_doc is not None
first_id = first_doc.id
doc2 = _onedrive_doc(
unique_id="od-dup-file",
search_space_id=space_id,
connector_id=db_connector.id,
user_id=user_id,
)
prepared2 = await service.prepare_for_indexing([doc2])
assert len(prepared2) == 0 or (len(prepared2) == 1 and prepared2[0].existing_document is not None)

View file

@ -0,0 +1,227 @@
"""Tests for parallel download + indexing in the OneDrive indexer."""
import asyncio
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.db import DocumentType
from app.tasks.connector_indexers.onedrive_indexer import (
_download_files_parallel,
)
pytestmark = pytest.mark.unit
_USER_ID = "00000000-0000-0000-0000-000000000001"
_CONNECTOR_ID = 42
_SEARCH_SPACE_ID = 1
def _make_file_dict(file_id: str, name: str, mime: str = "text/plain") -> dict:
return {
"id": file_id,
"name": name,
"file": {"mimeType": mime},
"lastModifiedDateTime": "2026-01-01T00:00:00Z",
}
def _mock_extract_ok(file_id: str, file_name: str):
return (
f"# Content of {file_name}",
{"onedrive_file_id": file_id, "onedrive_file_name": file_name},
None,
)
@pytest.fixture
def mock_onedrive_client():
return MagicMock()
@pytest.fixture
def patch_extract(monkeypatch):
def _patch(side_effect=None, return_value=None):
mock = AsyncMock(side_effect=side_effect, return_value=return_value)
monkeypatch.setattr(
"app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content",
mock,
)
return mock
return _patch
# Slice 1: Tracer bullet
async def test_single_file_returns_one_connector_document(
mock_onedrive_client, patch_extract,
):
patch_extract(return_value=_mock_extract_ok("f1", "test.txt"))
docs, failed = await _download_files_parallel(
mock_onedrive_client,
[_make_file_dict("f1", "test.txt")],
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert len(docs) == 1
assert failed == 0
assert docs[0].title == "test.txt"
assert docs[0].unique_id == "f1"
assert docs[0].document_type == DocumentType.ONEDRIVE_FILE
# Slice 2: Multiple files all produce documents
async def test_multiple_files_all_produce_documents(
mock_onedrive_client, patch_extract,
):
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
patch_extract(
side_effect=[_mock_extract_ok(f"f{i}", f"file{i}.txt") for i in range(3)]
)
docs, failed = await _download_files_parallel(
mock_onedrive_client,
files,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert len(docs) == 3
assert failed == 0
assert {d.unique_id for d in docs} == {"f0", "f1", "f2"}
# Slice 3: Error isolation
async def test_one_download_exception_does_not_block_others(
mock_onedrive_client, patch_extract,
):
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
patch_extract(
side_effect=[
_mock_extract_ok("f0", "file0.txt"),
RuntimeError("network timeout"),
_mock_extract_ok("f2", "file2.txt"),
]
)
docs, failed = await _download_files_parallel(
mock_onedrive_client,
files,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert len(docs) == 2
assert failed == 1
assert {d.unique_id for d in docs} == {"f0", "f2"}
# Slice 4: ETL error counts as download failure
async def test_etl_error_counts_as_download_failure(
mock_onedrive_client, patch_extract,
):
files = [_make_file_dict("f0", "good.txt"), _make_file_dict("f1", "bad.txt")]
patch_extract(
side_effect=[
_mock_extract_ok("f0", "good.txt"),
(None, {}, "ETL failed"),
]
)
docs, failed = await _download_files_parallel(
mock_onedrive_client,
files,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
assert len(docs) == 1
assert failed == 1
# Slice 5: Semaphore bound
async def test_concurrency_bounded_by_semaphore(
mock_onedrive_client, monkeypatch,
):
lock = asyncio.Lock()
active = 0
peak = 0
async def _slow_extract(client, file):
nonlocal active, peak
async with lock:
active += 1
peak = max(peak, active)
await asyncio.sleep(0.05)
async with lock:
active -= 1
return _mock_extract_ok(file["id"], file["name"])
monkeypatch.setattr(
"app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content",
_slow_extract,
)
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(6)]
docs, failed = await _download_files_parallel(
mock_onedrive_client,
files,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
max_concurrency=2,
)
assert len(docs) == 6
assert failed == 0
assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2"
# Slice 6: Heartbeat fires
async def test_heartbeat_fires_during_parallel_downloads(
mock_onedrive_client, monkeypatch,
):
import app.tasks.connector_indexers.onedrive_indexer as _mod
monkeypatch.setattr(_mod, "HEARTBEAT_INTERVAL_SECONDS", 0)
async def _slow_extract(client, file):
await asyncio.sleep(0.05)
return _mock_extract_ok(file["id"], file["name"])
monkeypatch.setattr(
"app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content",
_slow_extract,
)
heartbeat_calls: list[int] = []
async def _on_heartbeat(count: int):
heartbeat_calls.append(count)
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
docs, failed = await _download_files_parallel(
mock_onedrive_client,
files,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
on_heartbeat=_on_heartbeat,
)
assert len(docs) == 3
assert failed == 0
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"