test: add unit tests for Dropbox integration, covering delta sync methods, file type filtering, and re-authentication behavior

This commit is contained in:
Anish Sarkar 2026-04-06 18:36:48 +05:30
parent b5a15b7681
commit caca491774
7 changed files with 843 additions and 0 deletions

View file

@ -8,6 +8,10 @@ import pytest
from app.db import DocumentType
from app.tasks.connector_indexers.dropbox_indexer import (
_download_files_parallel,
_index_full_scan,
_index_selected_files,
_index_with_delta_sync,
index_dropbox_files,
)
pytestmark = pytest.mark.unit
@ -234,3 +238,544 @@ async def test_heartbeat_fires_during_parallel_downloads(
assert len(docs) == 3
assert failed == 0
assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once"
# ---------------------------------------------------------------------------
# D1-D2: _index_full_scan tests
# ---------------------------------------------------------------------------
def _folder_dict(name: str) -> dict:
return {".tag": "folder", "name": name}
@pytest.fixture
def full_scan_mocks(mock_dropbox_client, monkeypatch):
"""Wire up mocks for _index_full_scan in isolation."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
mock_session = AsyncMock()
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
mock_log_entry = MagicMock()
skip_results: dict[str, tuple[bool, str | None]] = {}
async def _fake_skip(session, file, search_space_id):
from app.connectors.dropbox.file_types import should_skip_file as _skip
if _skip(file):
return True, "folder/non-downloadable"
return skip_results.get(file.get("id", ""), (False, None))
monkeypatch.setattr(_mod, "_should_skip_file", _fake_skip)
download_and_index_mock = AsyncMock(return_value=(0, 0))
monkeypatch.setattr(_mod, "_download_and_index", download_and_index_mock)
return {
"dropbox_client": mock_dropbox_client,
"session": mock_session,
"task_logger": mock_task_logger,
"log_entry": mock_log_entry,
"skip_results": skip_results,
"download_and_index_mock": download_and_index_mock,
}
async def _run_full_scan(mocks, monkeypatch, page_files, *, max_files=500):
import app.tasks.connector_indexers.dropbox_indexer as _mod
monkeypatch.setattr(
_mod,
"get_files_in_folder",
AsyncMock(return_value=(page_files, None)),
)
return await _index_full_scan(
mocks["dropbox_client"],
mocks["session"],
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
"",
"Root",
mocks["task_logger"],
mocks["log_entry"],
max_files,
enable_summary=True,
)
async def test_full_scan_three_phase_counts(full_scan_mocks, monkeypatch):
"""Skipped files excluded, renames counted as indexed, new files downloaded."""
page_files = [
_folder_dict("SubFolder"),
_make_file_dict("skip1", "unchanged.txt"),
_make_file_dict("rename1", "renamed.txt"),
_make_file_dict("new1", "new1.txt"),
_make_file_dict("new2", "new2.txt"),
]
full_scan_mocks["skip_results"]["skip1"] = (True, "unchanged")
full_scan_mocks["skip_results"]["rename1"] = (
True,
"File renamed: 'old' -> 'renamed.txt'",
)
full_scan_mocks["download_and_index_mock"].return_value = (2, 0)
indexed, skipped = await _run_full_scan(
full_scan_mocks, monkeypatch, page_files
)
assert indexed == 3 # 1 renamed + 2 from batch
assert skipped == 2 # 1 folder + 1 unchanged
call_args = full_scan_mocks["download_and_index_mock"].call_args
call_files = call_args[0][2]
assert len(call_files) == 2
assert {f["id"] for f in call_files} == {"new1", "new2"}
async def test_full_scan_respects_max_files(full_scan_mocks, monkeypatch):
"""Only max_files non-folder items are considered."""
page_files = [_make_file_dict(f"f{i}", f"file{i}.txt") for i in range(10)]
full_scan_mocks["download_and_index_mock"].return_value = (3, 0)
await _run_full_scan(full_scan_mocks, monkeypatch, page_files, max_files=3)
call_files = full_scan_mocks["download_and_index_mock"].call_args[0][2]
assert len(call_files) == 3
# ---------------------------------------------------------------------------
# D3-D5: _index_selected_files tests
# ---------------------------------------------------------------------------
@pytest.fixture
def selected_files_mocks(mock_dropbox_client, monkeypatch):
"""Wire up mocks for _index_selected_files tests."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
mock_session = AsyncMock()
get_file_results: dict[str, tuple[dict | None, str | None]] = {}
async def _fake_get_file(client, path):
return get_file_results.get(path, (None, f"Not configured: {path}"))
monkeypatch.setattr(_mod, "get_file_by_path", _fake_get_file)
skip_results: dict[str, tuple[bool, str | None]] = {}
async def _fake_skip(session, file, search_space_id):
return skip_results.get(file["id"], (False, None))
monkeypatch.setattr(_mod, "_should_skip_file", _fake_skip)
download_and_index_mock = AsyncMock(return_value=(0, 0))
monkeypatch.setattr(_mod, "_download_and_index", download_and_index_mock)
return {
"dropbox_client": mock_dropbox_client,
"session": mock_session,
"get_file_results": get_file_results,
"skip_results": skip_results,
"download_and_index_mock": download_and_index_mock,
}
async def _run_selected(mocks, file_tuples):
return await _index_selected_files(
mocks["dropbox_client"],
mocks["session"],
file_tuples,
connector_id=_CONNECTOR_ID,
search_space_id=_SEARCH_SPACE_ID,
user_id=_USER_ID,
enable_summary=True,
)
async def test_selected_files_single_file_indexed(selected_files_mocks):
selected_files_mocks["get_file_results"]["/report.pdf"] = (
_make_file_dict("f1", "report.pdf"),
None,
)
selected_files_mocks["download_and_index_mock"].return_value = (1, 0)
indexed, skipped, errors = await _run_selected(
selected_files_mocks,
[("/report.pdf", "report.pdf")],
)
assert indexed == 1
assert skipped == 0
assert errors == []
async def test_selected_files_fetch_failure_isolation(selected_files_mocks):
selected_files_mocks["get_file_results"]["/first.txt"] = (
_make_file_dict("f1", "first.txt"),
None,
)
selected_files_mocks["get_file_results"]["/mid.txt"] = (None, "HTTP 404")
selected_files_mocks["get_file_results"]["/third.txt"] = (
_make_file_dict("f3", "third.txt"),
None,
)
selected_files_mocks["download_and_index_mock"].return_value = (2, 0)
indexed, skipped, errors = await _run_selected(
selected_files_mocks,
[("/first.txt", "first.txt"), ("/mid.txt", "mid.txt"), ("/third.txt", "third.txt")],
)
assert indexed == 2
assert skipped == 0
assert len(errors) == 1
assert "mid.txt" in errors[0]
async def test_selected_files_skip_rename_counting(selected_files_mocks):
for path, fid, fname in [
("/unchanged.txt", "s1", "unchanged.txt"),
("/renamed.txt", "r1", "renamed.txt"),
("/new1.txt", "n1", "new1.txt"),
("/new2.txt", "n2", "new2.txt"),
]:
selected_files_mocks["get_file_results"][path] = (
_make_file_dict(fid, fname),
None,
)
selected_files_mocks["skip_results"]["s1"] = (True, "unchanged")
selected_files_mocks["skip_results"]["r1"] = (
True,
"File renamed: 'old' -> 'renamed.txt'",
)
selected_files_mocks["download_and_index_mock"].return_value = (2, 0)
indexed, skipped, errors = await _run_selected(
selected_files_mocks,
[
("/unchanged.txt", "unchanged.txt"),
("/renamed.txt", "renamed.txt"),
("/new1.txt", "new1.txt"),
("/new2.txt", "new2.txt"),
],
)
assert indexed == 3 # 1 renamed + 2 batch
assert skipped == 1
assert errors == []
mock = selected_files_mocks["download_and_index_mock"]
call_files = mock.call_args[0][2]
assert len(call_files) == 2
assert {f["id"] for f in call_files} == {"n1", "n2"}
# ---------------------------------------------------------------------------
# E1-E4: _index_with_delta_sync tests
# ---------------------------------------------------------------------------
async def test_delta_sync_deletions_call_remove_document(monkeypatch):
"""E1: deleted entries are processed via _remove_document."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
entries = [
{".tag": "deleted", "name": "gone.txt", "path_lower": "/gone.txt", "id": "id:del1"},
{".tag": "deleted", "name": "also_gone.pdf", "path_lower": "/also_gone.pdf", "id": "id:del2"},
]
mock_client = MagicMock()
mock_client.get_changes = AsyncMock(return_value=(entries, "new-cursor", None))
remove_calls: list[str] = []
async def _fake_remove(session, file_id, search_space_id):
remove_calls.append(file_id)
monkeypatch.setattr(_mod, "_remove_document", _fake_remove)
monkeypatch.setattr(_mod, "_download_and_index", AsyncMock(return_value=(0, 0)))
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
"old-cursor",
mock_task_logger,
MagicMock(),
max_files=500,
enable_summary=True,
)
assert sorted(remove_calls) == ["id:del1", "id:del2"]
assert cursor == "new-cursor"
async def test_delta_sync_upserts_filtered_and_downloaded(monkeypatch):
"""E2: modified/new file entries go through skip filter then download+index."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
entries = [
_make_file_dict("mod1", "modified1.txt"),
_make_file_dict("mod2", "modified2.txt"),
]
mock_client = MagicMock()
mock_client.get_changes = AsyncMock(return_value=(entries, "cursor-v2", None))
monkeypatch.setattr(_mod, "_should_skip_file", AsyncMock(return_value=(False, None)))
download_mock = AsyncMock(return_value=(2, 0))
monkeypatch.setattr(_mod, "_download_and_index", download_mock)
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
"cursor-v1",
mock_task_logger,
MagicMock(),
max_files=500,
enable_summary=True,
)
assert indexed == 2
assert skipped == 0
assert cursor == "cursor-v2"
downloaded_files = download_mock.call_args[0][2]
assert len(downloaded_files) == 2
assert {f["id"] for f in downloaded_files} == {"mod1", "mod2"}
async def test_delta_sync_mix_deletions_and_upserts(monkeypatch):
"""E3: deletions processed, then remaining upserts filtered and indexed."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
entries = [
{".tag": "deleted", "name": "removed.txt", "path_lower": "/removed.txt", "id": "id:del1"},
{".tag": "deleted", "name": "trashed.pdf", "path_lower": "/trashed.pdf", "id": "id:del2"},
_make_file_dict("mod1", "updated.txt"),
_make_file_dict("new1", "brandnew.docx"),
]
mock_client = MagicMock()
mock_client.get_changes = AsyncMock(return_value=(entries, "final-cursor", None))
remove_calls: list[str] = []
async def _fake_remove(session, file_id, search_space_id):
remove_calls.append(file_id)
monkeypatch.setattr(_mod, "_remove_document", _fake_remove)
monkeypatch.setattr(_mod, "_should_skip_file", AsyncMock(return_value=(False, None)))
download_mock = AsyncMock(return_value=(2, 0))
monkeypatch.setattr(_mod, "_download_and_index", download_mock)
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
"old-cursor",
mock_task_logger,
MagicMock(),
max_files=500,
enable_summary=True,
)
assert sorted(remove_calls) == ["id:del1", "id:del2"]
assert indexed == 2
assert skipped == 0
assert cursor == "final-cursor"
downloaded_files = download_mock.call_args[0][2]
assert {f["id"] for f in downloaded_files} == {"mod1", "new1"}
async def test_delta_sync_returns_new_cursor(monkeypatch):
"""E4: the new cursor from the API response is returned."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
mock_client = MagicMock()
mock_client.get_changes = AsyncMock(return_value=([], "brand-new-cursor-xyz", None))
monkeypatch.setattr(_mod, "_download_and_index", AsyncMock(return_value=(0, 0)))
mock_task_logger = MagicMock()
mock_task_logger.log_task_progress = AsyncMock()
indexed, skipped, cursor = await _index_with_delta_sync(
mock_client,
AsyncMock(),
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
"old-cursor",
mock_task_logger,
MagicMock(),
max_files=500,
enable_summary=True,
)
assert cursor == "brand-new-cursor-xyz"
assert indexed == 0
assert skipped == 0
# ---------------------------------------------------------------------------
# F1-F3: index_dropbox_files orchestrator tests
# ---------------------------------------------------------------------------
@pytest.fixture
def orchestrator_mocks(monkeypatch):
"""Wire up mocks for index_dropbox_files orchestrator tests."""
import app.tasks.connector_indexers.dropbox_indexer as _mod
mock_connector = MagicMock()
mock_connector.config = {"_token_encrypted": False}
mock_connector.last_indexed_at = None
mock_connector.enable_summary = True
monkeypatch.setattr(
_mod,
"get_connector_by_id",
AsyncMock(return_value=mock_connector),
)
mock_task_logger = MagicMock()
mock_task_logger.log_task_start = AsyncMock(return_value=MagicMock())
mock_task_logger.log_task_progress = AsyncMock()
mock_task_logger.log_task_success = AsyncMock()
mock_task_logger.log_task_failure = AsyncMock()
monkeypatch.setattr(
_mod, "TaskLoggingService", MagicMock(return_value=mock_task_logger)
)
monkeypatch.setattr(_mod, "update_connector_last_indexed", AsyncMock())
full_scan_mock = AsyncMock(return_value=(5, 2))
monkeypatch.setattr(_mod, "_index_full_scan", full_scan_mock)
delta_sync_mock = AsyncMock(return_value=(3, 1, "delta-cursor-new"))
monkeypatch.setattr(_mod, "_index_with_delta_sync", delta_sync_mock)
mock_client = MagicMock()
mock_client.get_latest_cursor = AsyncMock(return_value=("latest-cursor-abc", None))
monkeypatch.setattr(
_mod, "DropboxClient", MagicMock(return_value=mock_client)
)
return {
"connector": mock_connector,
"full_scan_mock": full_scan_mock,
"delta_sync_mock": delta_sync_mock,
"mock_client": mock_client,
}
async def test_orchestrator_uses_delta_sync_when_cursor_and_last_indexed(
orchestrator_mocks,
):
"""F1: with cursor + last_indexed_at + use_delta_sync, calls delta sync."""
from datetime import UTC, datetime
connector = orchestrator_mocks["connector"]
connector.config = {
"_token_encrypted": False,
"folder_cursors": {"/docs": "saved-cursor-123"},
}
connector.last_indexed_at = datetime(2026, 1, 1, tzinfo=UTC)
mock_session = AsyncMock()
mock_session.commit = AsyncMock()
indexed, skipped, error = await index_dropbox_files(
mock_session,
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
{
"folders": [{"path": "/docs", "name": "Docs"}],
"files": [],
"indexing_options": {"use_delta_sync": True},
},
)
assert error is None
orchestrator_mocks["delta_sync_mock"].assert_called_once()
orchestrator_mocks["full_scan_mock"].assert_not_called()
async def test_orchestrator_falls_back_to_full_scan_without_cursor(
orchestrator_mocks,
):
"""F2: without cursor, falls back to full scan."""
connector = orchestrator_mocks["connector"]
connector.config = {"_token_encrypted": False}
connector.last_indexed_at = None
mock_session = AsyncMock()
mock_session.commit = AsyncMock()
indexed, skipped, error = await index_dropbox_files(
mock_session,
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
{
"folders": [{"path": "/docs", "name": "Docs"}],
"files": [],
"indexing_options": {"use_delta_sync": True},
},
)
assert error is None
orchestrator_mocks["full_scan_mock"].assert_called_once()
orchestrator_mocks["delta_sync_mock"].assert_not_called()
async def test_orchestrator_persists_cursor_after_sync(orchestrator_mocks):
"""F3: after sync, persists new cursor to connector config."""
connector = orchestrator_mocks["connector"]
connector.config = {"_token_encrypted": False}
connector.last_indexed_at = None
mock_session = AsyncMock()
mock_session.commit = AsyncMock()
await index_dropbox_files(
mock_session,
_CONNECTOR_ID,
_SEARCH_SPACE_ID,
_USER_ID,
{
"folders": [{"path": "/docs", "name": "Docs"}],
"files": [],
},
)
assert "folder_cursors" in connector.config
assert connector.config["folder_cursors"]["/docs"] == "latest-cursor-abc"

View file

@ -0,0 +1,115 @@
"""Tests for DropboxClient delta-sync methods (get_latest_cursor, get_changes)."""
from unittest.mock import AsyncMock, MagicMock
import pytest
from app.connectors.dropbox.client import DropboxClient
pytestmark = pytest.mark.unit
def _make_client() -> DropboxClient:
"""Create a DropboxClient with a mocked DB session so no real DB needed."""
client = DropboxClient.__new__(DropboxClient)
client._session = MagicMock()
client._connector_id = 1
return client
# ---------- C1: get_latest_cursor ----------
async def test_get_latest_cursor_returns_cursor_string(monkeypatch):
client = _make_client()
fake_resp = MagicMock()
fake_resp.status_code = 200
fake_resp.json.return_value = {"cursor": "AAHbKxRZ9enq…"}
monkeypatch.setattr(client, "_request", AsyncMock(return_value=fake_resp))
cursor, error = await client.get_latest_cursor("/my-folder")
assert cursor == "AAHbKxRZ9enq…"
assert error is None
client._request.assert_called_once_with(
"/2/files/list_folder/get_latest_cursor",
{"path": "/my-folder", "recursive": False, "include_non_downloadable_files": True},
)
# ---------- C2: get_changes returns entries and new cursor ----------
async def test_get_changes_returns_entries_and_cursor(monkeypatch):
client = _make_client()
fake_resp = MagicMock()
fake_resp.status_code = 200
fake_resp.json.return_value = {
"entries": [
{".tag": "file", "name": "new.txt", "id": "id:abc"},
{".tag": "deleted", "name": "old.txt"},
],
"cursor": "cursor-v2",
"has_more": False,
}
monkeypatch.setattr(client, "_request", AsyncMock(return_value=fake_resp))
entries, new_cursor, error = await client.get_changes("cursor-v1")
assert error is None
assert new_cursor == "cursor-v2"
assert len(entries) == 2
assert entries[0]["name"] == "new.txt"
assert entries[1][".tag"] == "deleted"
# ---------- C3: get_changes handles pagination ----------
async def test_get_changes_handles_pagination(monkeypatch):
client = _make_client()
page1 = MagicMock()
page1.status_code = 200
page1.json.return_value = {
"entries": [{".tag": "file", "name": "a.txt", "id": "id:a"}],
"cursor": "cursor-page2",
"has_more": True,
}
page2 = MagicMock()
page2.status_code = 200
page2.json.return_value = {
"entries": [{".tag": "file", "name": "b.txt", "id": "id:b"}],
"cursor": "cursor-final",
"has_more": False,
}
request_mock = AsyncMock(side_effect=[page1, page2])
monkeypatch.setattr(client, "_request", request_mock)
entries, new_cursor, error = await client.get_changes("cursor-v1")
assert error is None
assert new_cursor == "cursor-final"
assert len(entries) == 2
assert {e["name"] for e in entries} == {"a.txt", "b.txt"}
assert request_mock.call_count == 2
# ---------- C4: get_changes raises on 401 ----------
async def test_get_changes_returns_error_on_401(monkeypatch):
client = _make_client()
fake_resp = MagicMock()
fake_resp.status_code = 401
fake_resp.text = "Unauthorized"
monkeypatch.setattr(client, "_request", AsyncMock(return_value=fake_resp))
entries, new_cursor, error = await client.get_changes("old-cursor")
assert error is not None
assert "401" in error
assert entries == []
assert new_cursor is None

View file

@ -0,0 +1,73 @@
"""Tests for Dropbox file type filtering (should_skip_file)."""
import pytest
from app.connectors.dropbox.file_types import should_skip_file
pytestmark = pytest.mark.unit
def test_folder_item_is_skipped():
item = {".tag": "folder", "name": "My Folder"}
assert should_skip_file(item) is True
def test_paper_file_is_not_skipped():
item = {".tag": "file", "name": "notes.paper", "is_downloadable": False}
assert should_skip_file(item) is False
def test_non_downloadable_item_is_skipped():
item = {".tag": "file", "name": "locked.gdoc", "is_downloadable": False}
assert should_skip_file(item) is True
@pytest.mark.parametrize(
"filename",
[
"archive.zip", "backup.tar", "data.gz", "stuff.rar", "pack.7z",
"program.exe", "lib.dll", "module.so", "image.dmg", "disk.iso",
"movie.mov", "clip.avi", "video.mkv", "film.wmv", "stream.flv",
"icon.svg", "anim.gif", "photo.webp", "shot.heic", "favicon.ico",
"raw.cr2", "photo.nef", "image.arw", "pic.dng",
"design.psd", "vector.ai", "mockup.sketch", "proto.fig",
"font.ttf", "font.otf", "font.woff", "font.woff2",
"model.stl", "scene.fbx", "mesh.blend",
"local.db", "data.sqlite", "access.mdb",
],
)
def test_non_parseable_extensions_are_skipped(filename):
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is True, f"{filename} should be skipped"
@pytest.mark.parametrize(
"filename",
[
"report.pdf", "document.docx", "sheet.xlsx", "slides.pptx",
"old.doc", "legacy.xls", "deck.ppt",
"readme.txt", "data.csv", "page.html", "notes.md",
"config.json", "feed.xml",
],
)
def test_parseable_documents_are_not_skipped(filename):
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is False, f"{filename} should NOT be skipped"
@pytest.mark.parametrize(
"filename",
["photo.jpg", "image.jpeg", "screenshot.png", "scan.bmp", "page.tiff", "doc.tif"],
)
def test_universal_images_are_not_skipped(filename):
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is False, f"{filename} should NOT be skipped"
@pytest.mark.parametrize(
"filename",
["icon.svg", "anim.gif", "photo.webp", "live.heic"],
)
def test_non_universal_images_are_skipped(filename):
item = {".tag": "file", "name": filename}
assert should_skip_file(item) is True, f"{filename} should be skipped"

View file

@ -0,0 +1,43 @@
"""Test that Dropbox re-auth preserves folder_cursors in connector config."""
import pytest
pytestmark = pytest.mark.unit
def test_reauth_preserves_folder_cursors():
"""G1: re-authentication preserves folder_cursors alongside cursor."""
old_config = {
"access_token": "old-token-enc",
"refresh_token": "old-refresh-enc",
"cursor": "old-cursor-abc",
"folder_cursors": {"/docs": "cursor-docs-123", "/photos": "cursor-photos-456"},
"_token_encrypted": True,
"auth_expired": True,
}
new_connector_config = {
"access_token": "new-token-enc",
"refresh_token": "new-refresh-enc",
"token_type": "bearer",
"expires_in": 14400,
"expires_at": "2026-04-06T16:00:00+00:00",
"_token_encrypted": True,
}
existing_cursor = old_config.get("cursor")
existing_folder_cursors = old_config.get("folder_cursors")
merged_config = {
**new_connector_config,
"cursor": existing_cursor,
"folder_cursors": existing_folder_cursors,
"auth_expired": False,
}
assert merged_config["access_token"] == "new-token-enc"
assert merged_config["cursor"] == "old-cursor-abc"
assert merged_config["folder_cursors"] == {
"/docs": "cursor-docs-123",
"/photos": "cursor-photos-456",
}
assert merged_config["auth_expired"] is False

View file

@ -0,0 +1,67 @@
"""Test that DoclingService registers InputFormat.IMAGE for image processing."""
from enum import Enum
from unittest.mock import MagicMock, patch
import pytest
pytestmark = pytest.mark.unit
class _FakeInputFormat(Enum):
PDF = "pdf"
IMAGE = "image"
def test_docling_service_registers_image_format():
"""DoclingService should initialise DocumentConverter with InputFormat.IMAGE
in allowed_formats so that image files (jpg, png, bmp, tiff) are accepted."""
mock_converter_cls = MagicMock()
mock_backend = MagicMock()
fake_pipeline_options_cls = MagicMock()
fake_pipeline_options = MagicMock()
fake_pipeline_options_cls.return_value = fake_pipeline_options
fake_pdf_format_option_cls = MagicMock()
with patch.dict("sys.modules", {
"docling": MagicMock(),
"docling.backend": MagicMock(),
"docling.backend.pypdfium2_backend": MagicMock(
PyPdfiumDocumentBackend=mock_backend
),
"docling.datamodel": MagicMock(),
"docling.datamodel.base_models": MagicMock(
InputFormat=_FakeInputFormat
),
"docling.datamodel.pipeline_options": MagicMock(
PdfPipelineOptions=fake_pipeline_options_cls
),
"docling.document_converter": MagicMock(
DocumentConverter=mock_converter_cls,
PdfFormatOption=fake_pdf_format_option_cls,
),
}):
import app.services.docling_service as mod
from importlib import reload
reload(mod)
mod.DoclingService()
call_kwargs = mock_converter_cls.call_args
assert call_kwargs is not None, "DocumentConverter was never called"
_, kwargs = call_kwargs
allowed = kwargs.get("allowed_formats")
format_opts = kwargs.get("format_options", {})
image_registered = (
(allowed is not None and _FakeInputFormat.IMAGE in allowed)
or _FakeInputFormat.IMAGE in format_opts
)
assert image_registered, (
f"InputFormat.IMAGE not registered. "
f"allowed_formats={allowed}, format_options keys={list(format_opts.keys())}"
)