diff --git a/docker/.env.example b/docker/.env.example index 3fb02d612..8297aae44 100644 --- a/docker/.env.example +++ b/docker/.env.example @@ -203,7 +203,7 @@ STT_SERVICE=local/base # AIRTABLE_CLIENT_SECRET= # AIRTABLE_REDIRECT_URI=http://localhost:8000/api/v1/auth/airtable/connector/callback -# -- Microsoft OAuth (shared for Teams and OneDrive) -- +# -- Microsoft OAuth (Teams & OneDrive) -- # MICROSOFT_CLIENT_ID= # MICROSOFT_CLIENT_SECRET= # TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 0b2cda19b..0361e2467 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -74,7 +74,7 @@ DISCORD_CLIENT_SECRET=your_discord_client_secret_here DISCORD_REDIRECT_URI=http://localhost:8000/api/v1/auth/discord/connector/callback DISCORD_BOT_TOKEN=your_bot_token_from_developer_portal -# Atlassian OAuth Configuration +# Atlassian OAuth Configuration (Jira & Confluence) ATLASSIAN_CLIENT_ID=your_atlassian_client_id_here ATLASSIAN_CLIENT_SECRET=your_atlassian_client_secret_here JIRA_REDIRECT_URI=http://localhost:8000/api/v1/auth/jira/connector/callback @@ -95,7 +95,7 @@ SLACK_CLIENT_ID=your_slack_client_id_here SLACK_CLIENT_SECRET=your_slack_client_secret_here SLACK_REDIRECT_URI=http://localhost:8000/api/v1/auth/slack/connector/callback -# Microsoft OAuth (shared for Teams and OneDrive) +# Microsoft OAuth (Teams & OneDrive) MICROSOFT_CLIENT_ID=your_microsoft_client_id_here MICROSOFT_CLIENT_SECRET=your_microsoft_client_secret_here TEAMS_REDIRECT_URI=http://localhost:8000/api/v1/auth/teams/connector/callback diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_onedrive_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_onedrive_pipeline.py new file mode 100644 index 000000000..ee83795a5 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_onedrive_pipeline.py @@ -0,0 +1,100 @@ +"""Integration tests: OneDrive ConnectorDocuments flow through the pipeline.""" + +import pytest +from sqlalchemy import select + +from app.config import config as app_config +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +_EMBEDDING_DIM = app_config.embedding_model_instance.dimension + +pytestmark = pytest.mark.integration + + +def _onedrive_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument: + return ConnectorDocument( + title=f"File {unique_id}.docx", + source_markdown=f"## Document\n\nContent from {unique_id}", + unique_id=unique_id, + document_type=DocumentType.ONEDRIVE_FILE, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=True, + fallback_summary=f"File: {unique_id}.docx", + metadata={ + "onedrive_file_id": unique_id, + "onedrive_file_name": f"{unique_id}.docx", + "document_type": "OneDrive File", + }, + ) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_onedrive_pipeline_creates_ready_document( + db_session, db_search_space, db_connector, db_user, mocker +): + """A OneDrive ConnectorDocument flows through prepare + index to a READY document.""" + space_id = db_search_space.id + doc = _onedrive_doc( + unique_id="od-file-abc", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=str(db_user.id), + ) + + service = IndexingPipelineService(session=db_session) + prepared = await service.prepare_for_indexing([doc]) + assert len(prepared) == 1 + + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + row = result.scalars().first() + + assert row is not None + assert row.document_type == DocumentType.ONEDRIVE_FILE + assert DocumentStatus.is_state(row.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_onedrive_duplicate_content_skipped( + db_session, db_search_space, db_connector, db_user, mocker +): + """Re-indexing a OneDrive doc with the same content is skipped (content hash match).""" + space_id = db_search_space.id + user_id = str(db_user.id) + + doc = _onedrive_doc( + unique_id="od-dup-file", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=user_id, + ) + + service = IndexingPipelineService(session=db_session) + + prepared = await service.prepare_for_indexing([doc]) + assert len(prepared) == 1 + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + first_doc = result.scalars().first() + assert first_doc is not None + first_id = first_doc.id + + doc2 = _onedrive_doc( + unique_id="od-dup-file", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=user_id, + ) + + prepared2 = await service.prepare_for_indexing([doc2]) + assert len(prepared2) == 0 or (len(prepared2) == 1 and prepared2[0].existing_document is not None) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py new file mode 100644 index 000000000..b5c774c6f --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py @@ -0,0 +1,227 @@ +"""Tests for parallel download + indexing in the OneDrive indexer.""" + +import asyncio +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.db import DocumentType +from app.tasks.connector_indexers.onedrive_indexer import ( + _download_files_parallel, +) + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +def _make_file_dict(file_id: str, name: str, mime: str = "text/plain") -> dict: + return { + "id": file_id, + "name": name, + "file": {"mimeType": mime}, + "lastModifiedDateTime": "2026-01-01T00:00:00Z", + } + + +def _mock_extract_ok(file_id: str, file_name: str): + return ( + f"# Content of {file_name}", + {"onedrive_file_id": file_id, "onedrive_file_name": file_name}, + None, + ) + + +@pytest.fixture +def mock_onedrive_client(): + return MagicMock() + + +@pytest.fixture +def patch_extract(monkeypatch): + def _patch(side_effect=None, return_value=None): + mock = AsyncMock(side_effect=side_effect, return_value=return_value) + monkeypatch.setattr( + "app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content", + mock, + ) + return mock + return _patch + + +# Slice 1: Tracer bullet +async def test_single_file_returns_one_connector_document( + mock_onedrive_client, patch_extract, +): + patch_extract(return_value=_mock_extract_ok("f1", "test.txt")) + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + [_make_file_dict("f1", "test.txt")], + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert len(docs) == 1 + assert failed == 0 + assert docs[0].title == "test.txt" + assert docs[0].unique_id == "f1" + assert docs[0].document_type == DocumentType.ONEDRIVE_FILE + + +# Slice 2: Multiple files all produce documents +async def test_multiple_files_all_produce_documents( + mock_onedrive_client, patch_extract, +): + files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)] + patch_extract( + side_effect=[_mock_extract_ok(f"f{i}", f"file{i}.txt") for i in range(3)] + ) + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + files, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert len(docs) == 3 + assert failed == 0 + assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} + + +# Slice 3: Error isolation +async def test_one_download_exception_does_not_block_others( + mock_onedrive_client, patch_extract, +): + files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)] + patch_extract( + side_effect=[ + _mock_extract_ok("f0", "file0.txt"), + RuntimeError("network timeout"), + _mock_extract_ok("f2", "file2.txt"), + ] + ) + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + files, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert len(docs) == 2 + assert failed == 1 + assert {d.unique_id for d in docs} == {"f0", "f2"} + + +# Slice 4: ETL error counts as download failure +async def test_etl_error_counts_as_download_failure( + mock_onedrive_client, patch_extract, +): + files = [_make_file_dict("f0", "good.txt"), _make_file_dict("f1", "bad.txt")] + patch_extract( + side_effect=[ + _mock_extract_ok("f0", "good.txt"), + (None, {}, "ETL failed"), + ] + ) + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + files, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + assert len(docs) == 1 + assert failed == 1 + + +# Slice 5: Semaphore bound +async def test_concurrency_bounded_by_semaphore( + mock_onedrive_client, monkeypatch, +): + lock = asyncio.Lock() + active = 0 + peak = 0 + + async def _slow_extract(client, file): + nonlocal active, peak + async with lock: + active += 1 + peak = max(peak, active) + await asyncio.sleep(0.05) + async with lock: + active -= 1 + return _mock_extract_ok(file["id"], file["name"]) + + monkeypatch.setattr( + "app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content", + _slow_extract, + ) + + files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(6)] + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + files, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + max_concurrency=2, + ) + + assert len(docs) == 6 + assert failed == 0 + assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" + + +# Slice 6: Heartbeat fires +async def test_heartbeat_fires_during_parallel_downloads( + mock_onedrive_client, monkeypatch, +): + import app.tasks.connector_indexers.onedrive_indexer as _mod + + monkeypatch.setattr(_mod, "HEARTBEAT_INTERVAL_SECONDS", 0) + + async def _slow_extract(client, file): + await asyncio.sleep(0.05) + return _mock_extract_ok(file["id"], file["name"]) + + monkeypatch.setattr( + "app.tasks.connector_indexers.onedrive_indexer.download_and_extract_content", + _slow_extract, + ) + + heartbeat_calls: list[int] = [] + + async def _on_heartbeat(count: int): + heartbeat_calls.append(count) + + files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(3)] + + docs, failed = await _download_files_parallel( + mock_onedrive_client, + files, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + on_heartbeat=_on_heartbeat, + ) + + assert len(docs) == 3 + assert failed == 0 + assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"