mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-25 00:36:31 +02:00
feat: add integration and unit tests for Dropbox indexing pipeline and parallel downloads
This commit is contained in:
parent
af115ddae4
commit
272de1bb40
2 changed files with 342 additions and 0 deletions
|
|
@ -0,0 +1,106 @@
|
|||
"""Integration tests: Dropbox ConnectorDocuments flow through the pipeline."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _dropbox_doc(
|
||||
*, unique_id: str, search_space_id: int, connector_id: int, user_id: str
|
||||
) -> ConnectorDocument:
|
||||
return ConnectorDocument(
|
||||
title=f"File {unique_id}.docx",
|
||||
source_markdown=f"## Document\n\nContent from {unique_id}",
|
||||
unique_id=unique_id,
|
||||
document_type=DocumentType.DROPBOX_FILE,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
created_by_id=user_id,
|
||||
should_summarize=True,
|
||||
fallback_summary=f"File: {unique_id}.docx",
|
||||
metadata={
|
||||
"dropbox_file_id": unique_id,
|
||||
"dropbox_file_name": f"{unique_id}.docx",
|
||||
"document_type": "Dropbox File",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_texts", "patched_chunk_text"
|
||||
)
|
||||
async def test_dropbox_pipeline_creates_ready_document(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A Dropbox ConnectorDocument flows through prepare + index to a READY document."""
|
||||
space_id = db_search_space.id
|
||||
doc = _dropbox_doc(
|
||||
unique_id="db-file-abc",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=str(db_user.id),
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
prepared = await service.prepare_for_indexing([doc])
|
||||
assert len(prepared) == 1
|
||||
|
||||
await service.index(prepared[0], doc, llm=mocker.Mock())
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row is not None
|
||||
assert row.document_type == DocumentType.DROPBOX_FILE
|
||||
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures(
|
||||
"patched_summarize", "patched_embed_texts", "patched_chunk_text"
|
||||
)
|
||||
async def test_dropbox_duplicate_content_skipped(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""Re-indexing a Dropbox doc with the same content is skipped (content hash match)."""
|
||||
space_id = db_search_space.id
|
||||
user_id = str(db_user.id)
|
||||
|
||||
doc = _dropbox_doc(
|
||||
unique_id="db-dup-file",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
|
||||
prepared = await service.prepare_for_indexing([doc])
|
||||
assert len(prepared) == 1
|
||||
await service.index(prepared[0], doc, llm=mocker.Mock())
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
first_doc = result.scalars().first()
|
||||
assert first_doc is not None
|
||||
doc2 = _dropbox_doc(
|
||||
unique_id="db-dup-file",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
prepared2 = await service.prepare_for_indexing([doc2])
|
||||
assert len(prepared2) == 0 or (
|
||||
len(prepared2) == 1 and prepared2[0].existing_document is not None
|
||||
)
|
||||
|
|
@ -0,0 +1,236 @@
|
|||
"""Tests for parallel download + indexing in the Dropbox indexer."""
|
||||
|
||||
import asyncio
|
||||
from unittest.mock import AsyncMock, MagicMock
|
||||
|
||||
import pytest
|
||||
|
||||
from app.db import DocumentType
|
||||
from app.tasks.connector_indexers.dropbox_indexer import (
|
||||
_download_files_parallel,
|
||||
)
|
||||
|
||||
pytestmark = pytest.mark.unit
|
||||
|
||||
_USER_ID = "00000000-0000-0000-0000-000000000001"
|
||||
_CONNECTOR_ID = 42
|
||||
_SEARCH_SPACE_ID = 1
|
||||
|
||||
|
||||
def _make_file_dict(file_id: str, name: str) -> dict:
|
||||
return {
|
||||
"id": file_id,
|
||||
"name": name,
|
||||
".tag": "file",
|
||||
"path_lower": f"/{name}",
|
||||
"server_modified": "2026-01-01T00:00:00Z",
|
||||
"content_hash": f"hash_{file_id}",
|
||||
}
|
||||
|
||||
|
||||
def _mock_extract_ok(file_id: str, file_name: str):
|
||||
return (
|
||||
f"# Content of {file_name}",
|
||||
{"dropbox_file_id": file_id, "dropbox_file_name": file_name},
|
||||
None,
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_dropbox_client():
|
||||
return MagicMock()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def patch_extract(monkeypatch):
|
||||
def _patch(side_effect=None, return_value=None):
|
||||
mock = AsyncMock(side_effect=side_effect, return_value=return_value)
|
||||
monkeypatch.setattr(
|
||||
"app.tasks.connector_indexers.dropbox_indexer.download_and_extract_content",
|
||||
mock,
|
||||
)
|
||||
return mock
|
||||
|
||||
return _patch
|
||||
|
||||
|
||||
# Slice 1: Tracer bullet
|
||||
async def test_single_file_returns_one_connector_document(
|
||||
mock_dropbox_client,
|
||||
patch_extract,
|
||||
):
|
||||
patch_extract(return_value=_mock_extract_ok("f1", "test.txt"))
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
[_make_file_dict("f1", "test.txt")],
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
)
|
||||
|
||||
assert len(docs) == 1
|
||||
assert failed == 0
|
||||
assert docs[0].title == "test.txt"
|
||||
assert docs[0].unique_id == "f1"
|
||||
assert docs[0].document_type == DocumentType.DROPBOX_FILE
|
||||
|
||||
|
||||
# Slice 2: Multiple files all produce documents
|
||||
async def test_multiple_files_all_produce_documents(
|
||||
mock_dropbox_client,
|
||||
patch_extract,
|
||||
):
|
||||
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
|
||||
patch_extract(
|
||||
side_effect=[_mock_extract_ok(f"f{i}", f"file{i}.txt") for i in range(3)]
|
||||
)
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
files,
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
)
|
||||
|
||||
assert len(docs) == 3
|
||||
assert failed == 0
|
||||
assert {d.unique_id for d in docs} == {"f0", "f1", "f2"}
|
||||
|
||||
|
||||
# Slice 3: Error isolation
|
||||
async def test_one_download_exception_does_not_block_others(
|
||||
mock_dropbox_client,
|
||||
patch_extract,
|
||||
):
|
||||
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
|
||||
patch_extract(
|
||||
side_effect=[
|
||||
_mock_extract_ok("f0", "file0.txt"),
|
||||
RuntimeError("network timeout"),
|
||||
_mock_extract_ok("f2", "file2.txt"),
|
||||
]
|
||||
)
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
files,
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
)
|
||||
|
||||
assert len(docs) == 2
|
||||
assert failed == 1
|
||||
assert {d.unique_id for d in docs} == {"f0", "f2"}
|
||||
|
||||
|
||||
# Slice 4: ETL error counts as download failure
|
||||
async def test_etl_error_counts_as_download_failure(
|
||||
mock_dropbox_client,
|
||||
patch_extract,
|
||||
):
|
||||
files = [_make_file_dict("f0", "good.txt"), _make_file_dict("f1", "bad.txt")]
|
||||
patch_extract(
|
||||
side_effect=[
|
||||
_mock_extract_ok("f0", "good.txt"),
|
||||
(None, {}, "ETL failed"),
|
||||
]
|
||||
)
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
files,
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
)
|
||||
|
||||
assert len(docs) == 1
|
||||
assert failed == 1
|
||||
|
||||
|
||||
# Slice 5: Semaphore bound
|
||||
async def test_concurrency_bounded_by_semaphore(
|
||||
mock_dropbox_client,
|
||||
monkeypatch,
|
||||
):
|
||||
lock = asyncio.Lock()
|
||||
active = 0
|
||||
peak = 0
|
||||
|
||||
async def _slow_extract(client, file):
|
||||
nonlocal active, peak
|
||||
async with lock:
|
||||
active += 1
|
||||
peak = max(peak, active)
|
||||
await asyncio.sleep(0.05)
|
||||
async with lock:
|
||||
active -= 1
|
||||
return _mock_extract_ok(file["id"], file["name"])
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.tasks.connector_indexers.dropbox_indexer.download_and_extract_content",
|
||||
_slow_extract,
|
||||
)
|
||||
|
||||
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(6)]
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
files,
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
max_concurrency=2,
|
||||
)
|
||||
|
||||
assert len(docs) == 6
|
||||
assert failed == 0
|
||||
assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2"
|
||||
|
||||
|
||||
# Slice 6: Heartbeat fires
|
||||
async def test_heartbeat_fires_during_parallel_downloads(
|
||||
mock_dropbox_client,
|
||||
monkeypatch,
|
||||
):
|
||||
import app.tasks.connector_indexers.dropbox_indexer as _mod
|
||||
|
||||
monkeypatch.setattr(_mod, "HEARTBEAT_INTERVAL_SECONDS", 0)
|
||||
|
||||
async def _slow_extract(client, file):
|
||||
await asyncio.sleep(0.05)
|
||||
return _mock_extract_ok(file["id"], file["name"])
|
||||
|
||||
monkeypatch.setattr(
|
||||
"app.tasks.connector_indexers.dropbox_indexer.download_and_extract_content",
|
||||
_slow_extract,
|
||||
)
|
||||
|
||||
heartbeat_calls: list[int] = []
|
||||
|
||||
async def _on_heartbeat(count: int):
|
||||
heartbeat_calls.append(count)
|
||||
|
||||
files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)]
|
||||
|
||||
docs, failed = await _download_files_parallel(
|
||||
mock_dropbox_client,
|
||||
files,
|
||||
connector_id=_CONNECTOR_ID,
|
||||
search_space_id=_SEARCH_SPACE_ID,
|
||||
user_id=_USER_ID,
|
||||
enable_summary=True,
|
||||
on_heartbeat=_on_heartbeat,
|
||||
)
|
||||
|
||||
assert len(docs) == 3
|
||||
assert failed == 0
|
||||
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"
|
||||
Loading…
Add table
Add a link
Reference in a new issue