diff --git a/surfsense_backend/app/services/page_limit_service.py b/surfsense_backend/app/services/page_limit_service.py index 080d05b5d..ea22067be 100644 --- a/surfsense_backend/app/services/page_limit_service.py +++ b/surfsense_backend/app/services/page_limit_service.py @@ -3,7 +3,7 @@ Service for managing user page limits for ETL services. """ import os -from pathlib import Path +from pathlib import Path, PurePosixPath from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession @@ -223,10 +223,91 @@ class PageLimitService: # Estimate ~2000 characters per page return max(1, content_length // 2000) + @staticmethod + def estimate_pages_from_metadata( + file_name_or_ext: str, file_size: int | str | None = None + ) -> int: + """Size-based page estimation from file name/extension and byte size. + + Pure function — no file I/O, no database access. Used by cloud + connectors (which only have API metadata) and as the internal + fallback for :meth:`estimate_pages_before_processing`. + + ``file_name_or_ext`` can be a full filename (``"report.pdf"``) or + a bare extension (``".pdf"``). ``file_size`` may be an int, a + stringified int from a cloud API, or *None*. + """ + if file_size is not None: + try: + file_size = int(file_size) + except (ValueError, TypeError): + file_size = 0 + else: + file_size = 0 + + if file_size <= 0: + return 1 + + ext = PurePosixPath(file_name_or_ext).suffix.lower() if file_name_or_ext else "" + if not ext and file_name_or_ext.startswith("."): + ext = file_name_or_ext.lower() + file_ext = ext + + if file_ext == ".pdf": + return max(1, file_size // (100 * 1024)) + + if file_ext in { + ".doc", ".docx", ".docm", ".dot", ".dotm", + ".odt", ".ott", ".sxw", ".stw", ".uot", + ".rtf", ".pages", ".wpd", ".wps", + ".abw", ".zabw", ".cwk", ".hwp", ".lwp", + ".mcw", ".mw", ".sdw", ".vor", + }: + return max(1, file_size // (50 * 1024)) + + if file_ext in { + ".ppt", ".pptx", ".pptm", ".pot", ".potx", + ".odp", ".otp", ".sxi", ".sti", ".uop", + ".key", ".sda", ".sdd", ".sdp", + }: + return max(1, file_size // (200 * 1024)) + + if file_ext in { + ".xls", ".xlsx", ".xlsm", ".xlsb", ".xlw", ".xlr", + ".ods", ".ots", ".fods", ".numbers", + ".123", ".wk1", ".wk2", ".wk3", ".wk4", ".wks", + ".wb1", ".wb2", ".wb3", ".wq1", ".wq2", + ".csv", ".tsv", ".slk", ".sylk", ".dif", ".dbf", + ".prn", ".qpw", ".602", ".et", ".eth", + }: + return max(1, file_size // (100 * 1024)) + + if file_ext in {".epub"}: + return max(1, file_size // (50 * 1024)) + + if file_ext in {".txt", ".log", ".md", ".markdown", ".htm", ".html", ".xml"}: + return max(1, file_size // 3000) + + if file_ext in { + ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", + ".webp", ".svg", ".cgm", ".odg", ".pbd", + }: + return 1 + + if file_ext in {".mp3", ".m4a", ".wav", ".mpga"}: + return max(1, file_size // (1024 * 1024)) + + if file_ext in {".mp4", ".mpeg", ".webm"}: + return max(1, file_size // (5 * 1024 * 1024)) + + return max(1, file_size // (80 * 1024)) + def estimate_pages_before_processing(self, file_path: str) -> int: """ - Estimate page count from file before processing (to avoid unnecessary API calls). - This is called BEFORE sending to ETL services to prevent cost on rejected files. + Estimate page count from a local file before processing. + + For PDFs, attempts to read the actual page count via pypdf. + For everything else, delegates to :meth:`estimate_pages_from_metadata`. Args: file_path: Path to the file @@ -240,7 +321,6 @@ class PageLimitService: file_ext = Path(file_path).suffix.lower() file_size = os.path.getsize(file_path) - # PDF files - try to get actual page count if file_ext == ".pdf": try: import pypdf @@ -249,153 +329,6 @@ class PageLimitService: pdf_reader = pypdf.PdfReader(f) return len(pdf_reader.pages) except Exception: - # If PDF reading fails, fall back to size estimation - # Typical PDF: ~100KB per page (conservative estimate) - return max(1, file_size // (100 * 1024)) + pass # fall through to size-based estimation - # Word Processing Documents - # Microsoft Word, LibreOffice Writer, WordPerfect, Pages, etc. - elif file_ext in [ - ".doc", - ".docx", - ".docm", - ".dot", - ".dotm", # Microsoft Word - ".odt", - ".ott", - ".sxw", - ".stw", - ".uot", # OpenDocument/StarOffice Writer - ".rtf", # Rich Text Format - ".pages", # Apple Pages - ".wpd", - ".wps", # WordPerfect, Microsoft Works - ".abw", - ".zabw", # AbiWord - ".cwk", - ".hwp", - ".lwp", - ".mcw", - ".mw", - ".sdw", - ".vor", # Other word processors - ]: - # Typical word document: ~50KB per page (conservative) - return max(1, file_size // (50 * 1024)) - - # Presentation Documents - # PowerPoint, Impress, Keynote, etc. - elif file_ext in [ - ".ppt", - ".pptx", - ".pptm", - ".pot", - ".potx", # Microsoft PowerPoint - ".odp", - ".otp", - ".sxi", - ".sti", - ".uop", # OpenDocument/StarOffice Impress - ".key", # Apple Keynote - ".sda", - ".sdd", - ".sdp", # StarOffice Draw/Impress - ]: - # Typical presentation: ~200KB per slide (conservative) - return max(1, file_size // (200 * 1024)) - - # Spreadsheet Documents - # Excel, Calc, Numbers, Lotus, etc. - elif file_ext in [ - ".xls", - ".xlsx", - ".xlsm", - ".xlsb", - ".xlw", - ".xlr", # Microsoft Excel - ".ods", - ".ots", - ".fods", # OpenDocument Spreadsheet - ".numbers", # Apple Numbers - ".123", - ".wk1", - ".wk2", - ".wk3", - ".wk4", - ".wks", # Lotus 1-2-3 - ".wb1", - ".wb2", - ".wb3", - ".wq1", - ".wq2", # Quattro Pro - ".csv", - ".tsv", - ".slk", - ".sylk", - ".dif", - ".dbf", - ".prn", - ".qpw", # Data formats - ".602", - ".et", - ".eth", # Other spreadsheets - ]: - # Spreadsheets typically have 1 sheet = 1 page for ETL - # Conservative: ~100KB per sheet - return max(1, file_size // (100 * 1024)) - - # E-books - elif file_ext in [".epub"]: - # E-books vary widely, estimate by size - # Typical e-book: ~50KB per page - return max(1, file_size // (50 * 1024)) - - # Plain Text and Markup Files - elif file_ext in [ - ".txt", - ".log", # Plain text - ".md", - ".markdown", # Markdown - ".htm", - ".html", - ".xml", # Markup - ]: - # Plain text: ~3000 bytes per page - return max(1, file_size // 3000) - - # Image Files - # Each image is typically processed as 1 page - elif file_ext in [ - ".jpg", - ".jpeg", # JPEG - ".png", # PNG - ".gif", # GIF - ".bmp", # Bitmap - ".tiff", # TIFF - ".webp", # WebP - ".svg", # SVG - ".cgm", # Computer Graphics Metafile - ".odg", - ".pbd", # OpenDocument Graphics - ]: - # Each image = 1 page - return 1 - - # Audio Files (transcription = typically 1 page per minute) - # Note: These should be handled by audio transcription flow, not ETL - elif file_ext in [".mp3", ".m4a", ".wav", ".mpga"]: - # Audio files: estimate based on duration - # Fallback: ~1MB per minute of audio, 1 page per minute transcript - return max(1, file_size // (1024 * 1024)) - - # Video Files (typically not processed for pages, but just in case) - elif file_ext in [".mp4", ".mpeg", ".webm"]: - # Video files: very rough estimate - # Typically wouldn't be page-based, but use conservative estimate - return max(1, file_size // (5 * 1024 * 1024)) - - # Other/Unknown Document Types - else: - # Conservative estimate: ~80KB per page - # This catches: .sgl, .sxg, .uof, .uos1, .uos2, .web, and any future formats - return max(1, file_size // (80 * 1024)) + return self.estimate_pages_from_metadata(file_ext, file_size) diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index ffc8ab72e..6b4bed4b5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -4,7 +4,6 @@ Base functionality and shared imports for connector indexers. import logging from datetime import UTC, datetime, timedelta - from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 1b039add7..87b3c55df 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -28,6 +28,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm +from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, @@ -278,6 +279,12 @@ async def _index_full_scan( }, ) + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + page_limit_reached = False + renamed_count = 0 skipped = 0 files_to_download: list[dict] = [] @@ -307,6 +314,21 @@ async def _index_full_scan( elif skip_item(file): skipped += 1 continue + + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + if not page_limit_reached: + logger.warning( + "Page limit reached during Dropbox full scan, " + "skipping remaining files" + ) + page_limit_reached = True + skipped += 1 + continue + + batch_estimated_pages += file_pages files_to_download.append(file) batch_indexed, failed = await _download_and_index( @@ -320,6 +342,14 @@ async def _index_full_scan( on_heartbeat=on_heartbeat_callback, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + indexed = renamed_count + batch_indexed logger.info( f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" @@ -340,6 +370,11 @@ async def _index_selected_files( on_heartbeat: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: """Index user-selected files using the parallel pipeline.""" + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + files_to_download: list[dict] = [] errors: list[str] = [] renamed_count = 0 @@ -364,6 +399,15 @@ async def _index_selected_files( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + display = file_name or file_path + errors.append(f"File '{display}': page limit would be exceeded") + continue + + batch_estimated_pages += file_pages files_to_download.append(file) batch_indexed, _failed = await _download_and_index( @@ -377,6 +421,14 @@ async def _index_selected_files( on_heartbeat=on_heartbeat, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + return renamed_count + batch_indexed, skipped, errors diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index b03d305f7..5e9e0f62f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -34,6 +34,7 @@ from app.indexing_pipeline.indexing_pipeline_service import ( PlaceholderInfo, ) from app.services.llm_service import get_user_long_context_llm +from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, @@ -327,6 +328,12 @@ async def _process_single_file( return 1, 0, 0 return 0, 1, 0 + page_limit_service = PageLimitService(session) + estimated_pages = PageLimitService.estimate_pages_from_metadata( + file_name, file.get("size") + ) + await page_limit_service.check_page_limit(user_id, estimated_pages) + markdown, drive_metadata, error = await download_and_extract_content( drive_client, file ) @@ -363,6 +370,9 @@ async def _process_single_file( ) await pipeline.index(document, connector_doc, user_llm) + await page_limit_service.update_page_usage( + user_id, estimated_pages, allow_exceed=True + ) logger.info(f"Successfully indexed Google Drive file: {file_name}") return 1, 0, 0 @@ -466,6 +476,11 @@ async def _index_selected_files( Returns (indexed_count, skipped_count, errors). """ + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + files_to_download: list[dict] = [] errors: list[str] = [] renamed_count = 0 @@ -486,6 +501,15 @@ async def _index_selected_files( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + display = file_name or file_id + errors.append(f"File '{display}': page limit would be exceeded") + continue + + batch_estimated_pages += file_pages files_to_download.append(file) await _create_drive_placeholders( @@ -507,6 +531,14 @@ async def _index_selected_files( on_heartbeat=on_heartbeat, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + return renamed_count + batch_indexed, skipped, errors @@ -545,6 +577,12 @@ async def _index_full_scan( # ------------------------------------------------------------------ # Phase 1 (serial): collect files, run skip checks, track renames # ------------------------------------------------------------------ + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + page_limit_reached = False + renamed_count = 0 skipped = 0 files_processed = 0 @@ -593,6 +631,20 @@ async def _index_full_scan( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + if not page_limit_reached: + logger.warning( + "Page limit reached during Google Drive full scan, " + "skipping remaining files" + ) + page_limit_reached = True + skipped += 1 + continue + + batch_estimated_pages += file_pages files_to_download.append(file) page_token = next_token @@ -636,6 +688,14 @@ async def _index_full_scan( on_heartbeat=on_heartbeat_callback, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + indexed = renamed_count + batch_indexed logger.info( f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" @@ -686,6 +746,12 @@ async def _index_with_delta_sync( # ------------------------------------------------------------------ # Phase 1 (serial): handle removals, collect files for download # ------------------------------------------------------------------ + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + page_limit_reached = False + renamed_count = 0 skipped = 0 files_to_download: list[dict] = [] @@ -715,6 +781,20 @@ async def _index_with_delta_sync( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + if not page_limit_reached: + logger.warning( + "Page limit reached during Google Drive delta sync, " + "skipping remaining files" + ) + page_limit_reached = True + skipped += 1 + continue + + batch_estimated_pages += file_pages files_to_download.append(file) # ------------------------------------------------------------------ @@ -742,6 +822,14 @@ async def _index_with_delta_sync( on_heartbeat=on_heartbeat_callback, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + indexed = renamed_count + batch_indexed logger.info( f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" diff --git a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py index 748cb0988..2301b6260 100644 --- a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py @@ -28,6 +28,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm +from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, @@ -291,6 +292,11 @@ async def _index_selected_files( on_heartbeat: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: """Index user-selected files using the parallel pipeline.""" + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + files_to_download: list[dict] = [] errors: list[str] = [] renamed_count = 0 @@ -311,6 +317,15 @@ async def _index_selected_files( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + display = file_name or file_id + errors.append(f"File '{display}': page limit would be exceeded") + continue + + batch_estimated_pages += file_pages files_to_download.append(file) batch_indexed, _failed = await _download_and_index( @@ -324,6 +339,14 @@ async def _index_selected_files( on_heartbeat=on_heartbeat, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + return renamed_count + batch_indexed, skipped, errors @@ -358,6 +381,12 @@ async def _index_full_scan( }, ) + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + page_limit_reached = False + renamed_count = 0 skipped = 0 files_to_download: list[dict] = [] @@ -383,6 +412,21 @@ async def _index_full_scan( else: skipped += 1 continue + + file_pages = PageLimitService.estimate_pages_from_metadata( + file.get("name", ""), file.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + if not page_limit_reached: + logger.warning( + "Page limit reached during OneDrive full scan, " + "skipping remaining files" + ) + page_limit_reached = True + skipped += 1 + continue + + batch_estimated_pages += file_pages files_to_download.append(file) batch_indexed, failed = await _download_and_index( @@ -396,6 +440,14 @@ async def _index_full_scan( on_heartbeat=on_heartbeat_callback, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + indexed = renamed_count + batch_indexed logger.info( f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" @@ -441,6 +493,12 @@ async def _index_with_delta_sync( logger.info(f"Processing {len(changes)} delta changes") + page_limit_service = PageLimitService(session) + pages_used, pages_limit = await page_limit_service.get_page_usage(user_id) + remaining_quota = pages_limit - pages_used + batch_estimated_pages = 0 + page_limit_reached = False + renamed_count = 0 skipped = 0 files_to_download: list[dict] = [] @@ -471,6 +529,20 @@ async def _index_with_delta_sync( skipped += 1 continue + file_pages = PageLimitService.estimate_pages_from_metadata( + change.get("name", ""), change.get("size") + ) + if batch_estimated_pages + file_pages > remaining_quota: + if not page_limit_reached: + logger.warning( + "Page limit reached during OneDrive delta sync, " + "skipping remaining files" + ) + page_limit_reached = True + skipped += 1 + continue + + batch_estimated_pages += file_pages files_to_download.append(change) batch_indexed, failed = await _download_and_index( @@ -484,6 +556,14 @@ async def _index_with_delta_sync( on_heartbeat=on_heartbeat_callback, ) + if batch_indexed > 0 and files_to_download and batch_estimated_pages > 0: + pages_to_deduct = max( + 1, batch_estimated_pages * batch_indexed // len(files_to_download) + ) + await page_limit_service.update_page_usage( + user_id, pages_to_deduct, allow_exceed=True + ) + indexed = renamed_count + batch_indexed logger.info( f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" diff --git a/surfsense_backend/tests/integration/document_upload/conftest.py b/surfsense_backend/tests/integration/document_upload/conftest.py index 1f1c7df59..6640fefdf 100644 --- a/surfsense_backend/tests/integration/document_upload/conftest.py +++ b/surfsense_backend/tests/integration/document_upload/conftest.py @@ -3,6 +3,7 @@ Prerequisites: PostgreSQL + pgvector only. External system boundaries are mocked: + - ETL parsing — LlamaParse (external API) and Docling (heavy library) - LLM summarization, text embedding, text chunking (external APIs) - Redis heartbeat (external infrastructure) - Task dispatch is swapped via DI (InlineTaskDispatcher) @@ -11,6 +12,7 @@ External system boundaries are mocked: from __future__ import annotations import contextlib +import os from collections.abc import AsyncGenerator from unittest.mock import AsyncMock, MagicMock @@ -298,3 +300,64 @@ def _mock_redis_heartbeat(monkeypatch): "app.tasks.celery_tasks.document_tasks._run_heartbeat_loop", AsyncMock(), ) + + +@pytest.fixture(autouse=True) +def _mock_etl_parsing(monkeypatch): + """Mock ETL parsing services — LlamaParse and Docling are external boundaries. + + Preserves the real contract: empty/corrupt files raise an error just like + the actual services would, so tests covering failure paths keep working. + """ + + _MOCK_MARKDOWN = "# Mocked Document\n\nThis is mocked ETL content." + + def _reject_empty(file_path: str) -> None: + if os.path.getsize(file_path) == 0: + raise RuntimeError(f"Cannot parse empty file: {file_path}") + + # -- LlamaParse mock (external API) -------------------------------- + + class _FakeMarkdownDoc: + def __init__(self, text: str): + self.text = text + + class _FakeLlamaParseResult: + async def aget_markdown_documents(self, *, split_by_page=False): + return [_FakeMarkdownDoc(_MOCK_MARKDOWN)] + + async def _fake_llamacloud_parse(**kwargs): + _reject_empty(kwargs["file_path"]) + return _FakeLlamaParseResult() + + monkeypatch.setattr( + "app.tasks.document_processors.file_processors.parse_with_llamacloud_retry", + _fake_llamacloud_parse, + ) + + # -- Docling mock (heavy library boundary) ------------------------- + + async def _fake_docling_parse(file_path: str, filename: str): + _reject_empty(file_path) + return _MOCK_MARKDOWN + + monkeypatch.setattr( + "app.tasks.document_processors.file_processors.parse_with_docling", + _fake_docling_parse, + ) + + class _FakeDoclingResult: + class document: + @staticmethod + def export_to_markdown(): + return _MOCK_MARKDOWN + + class _FakeDocumentConverter: + def convert(self, file_path): + _reject_empty(file_path) + return _FakeDoclingResult() + + monkeypatch.setattr( + "docling.document_converter.DocumentConverter", + _FakeDocumentConverter, + ) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py index 3fe8a183d..20bd3f3d6 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py @@ -248,12 +248,33 @@ def _folder_dict(file_id: str, name: str) -> dict: } +def _make_page_limit_session(pages_used=0, pages_limit=999_999): + """Build a mock DB session that real PageLimitService can operate against.""" + + class _FakeUser: + def __init__(self, pu, pl): + self.pages_used = pu + self.pages_limit = pl + + fake_user = _FakeUser(pages_used, pages_limit) + session = AsyncMock() + + def _make_result(*_a, **_kw): + r = MagicMock() + r.first.return_value = (fake_user.pages_used, fake_user.pages_limit) + r.unique.return_value.scalar_one_or_none.return_value = fake_user + return r + + session.execute = AsyncMock(side_effect=_make_result) + return session, fake_user + + @pytest.fixture def full_scan_mocks(mock_drive_client, monkeypatch): """Wire up all mocks needed to call _index_full_scan in isolation.""" import app.tasks.connector_indexers.google_drive_indexer as _mod - mock_session = AsyncMock() + mock_session, _ = _make_page_limit_session() mock_connector = MagicMock() mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() @@ -472,7 +493,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch): AsyncMock(return_value=MagicMock()), ) - mock_session = AsyncMock() + mock_session, _ = _make_page_limit_session() mock_task_logger = MagicMock() mock_task_logger.log_task_progress = AsyncMock() @@ -512,7 +533,7 @@ def selected_files_mocks(mock_drive_client, monkeypatch): """Wire up mocks for _index_selected_files tests.""" import app.tasks.connector_indexers.google_drive_indexer as _mod - mock_session = AsyncMock() + mock_session, _ = _make_page_limit_session() get_file_results: dict[str, tuple[dict | None, str | None]] = {} diff --git a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py new file mode 100644 index 000000000..1c93965f3 --- /dev/null +++ b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py @@ -0,0 +1,648 @@ +"""Tests for page limit enforcement in connector indexers. + +Covers: + A) PageLimitService.estimate_pages_from_metadata — pure function (no mocks) + B) Page-limit quota gating in _index_selected_files tested through the + real PageLimitService with a mock DB session (system boundary). + Google Drive is the primary, with OneDrive/Dropbox smoke tests. +""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.services.page_limit_service import PageLimitService + +pytestmark = pytest.mark.unit + +_USER_ID = "00000000-0000-0000-0000-000000000001" +_CONNECTOR_ID = 42 +_SEARCH_SPACE_ID = 1 + + +# =================================================================== +# A) PageLimitService.estimate_pages_from_metadata — pure function +# No mocks: it's a staticmethod with no I/O. +# =================================================================== + + +class TestEstimatePagesFromMetadata: + """Vertical slices for the page estimation staticmethod.""" + + def test_pdf_100kb_returns_1(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", 100 * 1024) == 1 + + def test_pdf_500kb_returns_5(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", 500 * 1024) == 5 + + def test_pdf_1mb(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", 1024 * 1024) == 10 + + def test_docx_50kb_returns_1(self): + assert PageLimitService.estimate_pages_from_metadata(".docx", 50 * 1024) == 1 + + def test_docx_200kb(self): + assert PageLimitService.estimate_pages_from_metadata(".docx", 200 * 1024) == 4 + + def test_pptx_uses_200kb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".pptx", 600 * 1024) == 3 + + def test_xlsx_uses_100kb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".xlsx", 300 * 1024) == 3 + + def test_txt_uses_3000_bytes_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".txt", 9000) == 3 + + def test_image_always_returns_1(self): + for ext in (".jpg", ".png", ".gif", ".webp"): + assert PageLimitService.estimate_pages_from_metadata(ext, 5_000_000) == 1 + + def test_audio_uses_1mb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".mp3", 3 * 1024 * 1024) == 3 + + def test_video_uses_5mb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".mp4", 15 * 1024 * 1024) == 3 + + def test_unknown_ext_uses_80kb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".xyz", 160 * 1024) == 2 + + def test_zero_size_returns_1(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", 0) == 1 + + def test_negative_size_returns_1(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", -500) == 1 + + def test_minimum_is_always_1(self): + assert PageLimitService.estimate_pages_from_metadata(".pdf", 50) == 1 + + def test_epub_uses_50kb_per_page(self): + assert PageLimitService.estimate_pages_from_metadata(".epub", 250 * 1024) == 5 + + +# =================================================================== +# B) Page-limit enforcement in connector indexers +# System boundary mocked: DB session (for PageLimitService) +# System boundary mocked: external API clients, download/ETL +# NOT mocked: PageLimitService itself (our own code) +# =================================================================== + + +class _FakeUser: + """Stands in for the User ORM model at the DB boundary.""" + + def __init__(self, pages_used: int = 0, pages_limit: int = 100): + self.pages_used = pages_used + self.pages_limit = pages_limit + + +def _make_page_limit_session(pages_used: int = 0, pages_limit: int = 100): + """Build a mock DB session that real PageLimitService can operate against. + + Every ``session.execute()`` returns a result compatible with both + ``get_page_usage`` (.first() → tuple) and ``update_page_usage`` + (.unique().scalar_one_or_none() → User-like). + """ + fake_user = _FakeUser(pages_used, pages_limit) + session = AsyncMock() + + def _make_result(*_args, **_kwargs): + result = MagicMock() + result.first.return_value = (fake_user.pages_used, fake_user.pages_limit) + result.unique.return_value.scalar_one_or_none.return_value = fake_user + return result + + session.execute = AsyncMock(side_effect=_make_result) + return session, fake_user + + +def _make_gdrive_file(file_id: str, name: str, size: int = 80 * 1024) -> dict: + return { + "id": file_id, + "name": name, + "mimeType": "application/octet-stream", + "size": str(size), + } + + +# --------------------------------------------------------------------------- +# Google Drive: _index_selected_files +# --------------------------------------------------------------------------- + + +@pytest.fixture +def gdrive_selected_mocks(monkeypatch): + """Mocks for Google Drive _index_selected_files — only system boundaries.""" + import app.tasks.connector_indexers.google_drive_indexer as _mod + + session, fake_user = _make_page_limit_session(0, 100) + + get_file_results: dict[str, tuple[dict | None, str | None]] = {} + + async def _fake_get_file(client, file_id): + return get_file_results.get(file_id, (None, f"Not configured: {file_id}")) + + monkeypatch.setattr(_mod, "get_file_by_id", _fake_get_file) + monkeypatch.setattr( + _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) + ) + + download_and_index_mock = AsyncMock(return_value=(0, 0)) + monkeypatch.setattr(_mod, "_download_and_index", download_and_index_mock) + + pipeline_mock = MagicMock() + pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock) + ) + + return { + "mod": _mod, + "session": session, + "fake_user": fake_user, + "get_file_results": get_file_results, + "download_and_index_mock": download_and_index_mock, + } + + +async def _run_gdrive_selected(mocks, file_ids): + from app.tasks.connector_indexers.google_drive_indexer import ( + _index_selected_files, + ) + + return await _index_selected_files( + MagicMock(), + mocks["session"], + file_ids, + connector_id=_CONNECTOR_ID, + search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, + enable_summary=True, + ) + + +async def test_gdrive_files_within_quota_are_downloaded(gdrive_selected_mocks): + """Files whose cumulative estimated pages fit within remaining quota + are sent to _download_and_index.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 100 + + for fid in ("f1", "f2", "f3"): + m["get_file_results"][fid] = ( + _make_gdrive_file(fid, f"{fid}.xyz", size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (3, 0) + + indexed, _skipped, errors = await _run_gdrive_selected( + m, [("f1", "f1.xyz"), ("f2", "f2.xyz"), ("f3", "f3.xyz")] + ) + + assert indexed == 3 + assert errors == [] + call_files = m["download_and_index_mock"].call_args[0][2] + assert len(call_files) == 3 + + +async def test_gdrive_files_exceeding_quota_rejected(gdrive_selected_mocks): + """Files whose pages would exceed remaining quota are rejected.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 98 + m["fake_user"].pages_limit = 100 + + m["get_file_results"]["big"] = ( + _make_gdrive_file("big", "huge.pdf", size=500 * 1024), None, + ) + + indexed, _skipped, errors = await _run_gdrive_selected(m, [("big", "huge.pdf")]) + + assert indexed == 0 + assert len(errors) == 1 + assert "page limit" in errors[0].lower() + + +async def test_gdrive_quota_mix_partial_indexing(gdrive_selected_mocks): + """3rd file pushes over quota → only first two indexed.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 2 + + for fid in ("f1", "f2", "f3"): + m["get_file_results"][fid] = ( + _make_gdrive_file(fid, f"{fid}.xyz", size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (2, 0) + + indexed, _skipped, errors = await _run_gdrive_selected( + m, [("f1", "f1.xyz"), ("f2", "f2.xyz"), ("f3", "f3.xyz")] + ) + + assert indexed == 2 + assert len(errors) == 1 + call_files = m["download_and_index_mock"].call_args[0][2] + assert {f["id"] for f in call_files} == {"f1", "f2"} + + +async def test_gdrive_proportional_page_deduction(gdrive_selected_mocks): + """Pages deducted are proportional to successfully indexed files.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 100 + + for fid in ("f1", "f2", "f3", "f4"): + m["get_file_results"][fid] = ( + _make_gdrive_file(fid, f"{fid}.xyz", size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (2, 2) + + await _run_gdrive_selected( + m, + [("f1", "f1.xyz"), ("f2", "f2.xyz"), ("f3", "f3.xyz"), ("f4", "f4.xyz")], + ) + + assert m["fake_user"].pages_used == 2 + + +async def test_gdrive_no_deduction_when_nothing_indexed(gdrive_selected_mocks): + """If batch_indexed == 0, user's pages_used stays unchanged.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 5 + m["fake_user"].pages_limit = 100 + + m["get_file_results"]["f1"] = ( + _make_gdrive_file("f1", "f1.xyz", size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (0, 1) + + await _run_gdrive_selected(m, [("f1", "f1.xyz")]) + + assert m["fake_user"].pages_used == 5 + + +async def test_gdrive_zero_quota_rejects_all(gdrive_selected_mocks): + """When pages_used == pages_limit, every file is rejected.""" + m = gdrive_selected_mocks + m["fake_user"].pages_used = 100 + m["fake_user"].pages_limit = 100 + + for fid in ("f1", "f2"): + m["get_file_results"][fid] = ( + _make_gdrive_file(fid, f"{fid}.xyz", size=80 * 1024), None, + ) + + indexed, _skipped, errors = await _run_gdrive_selected( + m, [("f1", "f1.xyz"), ("f2", "f2.xyz")] + ) + + assert indexed == 0 + assert len(errors) == 2 + + +# --------------------------------------------------------------------------- +# Google Drive: _index_full_scan +# --------------------------------------------------------------------------- + + +@pytest.fixture +def gdrive_full_scan_mocks(monkeypatch): + import app.tasks.connector_indexers.google_drive_indexer as _mod + + session, fake_user = _make_page_limit_session(0, 100) + mock_task_logger = MagicMock() + mock_task_logger.log_task_progress = AsyncMock() + + monkeypatch.setattr( + _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) + ) + + download_mock = AsyncMock(return_value=([], 0)) + monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) + + batch_mock = AsyncMock(return_value=([], 0, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock) + ) + monkeypatch.setattr( + _mod, "get_user_long_context_llm", AsyncMock(return_value=MagicMock()) + ) + + return { + "mod": _mod, + "session": session, + "fake_user": fake_user, + "task_logger": mock_task_logger, + "download_mock": download_mock, + "batch_mock": batch_mock, + } + + +async def _run_gdrive_full_scan(mocks, max_files=500): + from app.tasks.connector_indexers.google_drive_indexer import _index_full_scan + + return await _index_full_scan( + MagicMock(), + mocks["session"], + MagicMock(), + _CONNECTOR_ID, + _SEARCH_SPACE_ID, + _USER_ID, + "folder-root", + "My Folder", + mocks["task_logger"], + MagicMock(), + max_files, + include_subfolders=False, + enable_summary=True, + ) + + +async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeypatch): + m = gdrive_full_scan_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 2 + + page_files = [ + _make_gdrive_file(f"f{i}", f"file{i}.xyz", size=80 * 1024) for i in range(5) + ] + monkeypatch.setattr( + m["mod"], "get_files_in_folder", + AsyncMock(return_value=(page_files, None, None)), + ) + m["download_mock"].return_value = ([], 0) + m["batch_mock"].return_value = ([], 2, 0) + + _indexed, skipped = await _run_gdrive_full_scan(m) + + call_files = m["download_mock"].call_args[0][1] + assert len(call_files) == 2 + assert skipped == 3 + + +async def test_gdrive_full_scan_deducts_after_indexing( + gdrive_full_scan_mocks, monkeypatch +): + m = gdrive_full_scan_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 100 + + page_files = [ + _make_gdrive_file(f"f{i}", f"file{i}.xyz", size=80 * 1024) for i in range(3) + ] + monkeypatch.setattr( + m["mod"], "get_files_in_folder", + AsyncMock(return_value=(page_files, None, None)), + ) + mock_docs = [MagicMock() for _ in range(3)] + m["download_mock"].return_value = (mock_docs, 0) + m["batch_mock"].return_value = ([], 3, 0) + + await _run_gdrive_full_scan(m) + + assert m["fake_user"].pages_used == 3 + + +# --------------------------------------------------------------------------- +# Google Drive: _index_with_delta_sync +# --------------------------------------------------------------------------- + + +async def test_gdrive_delta_sync_skips_over_quota(monkeypatch): + import app.tasks.connector_indexers.google_drive_indexer as _mod + + session, _ = _make_page_limit_session(0, 2) + + changes = [ + { + "fileId": f"mod{i}", + "file": _make_gdrive_file(f"mod{i}", f"mod{i}.xyz", size=80 * 1024), + } + for i in range(5) + ] + monkeypatch.setattr( + _mod, "fetch_all_changes", + AsyncMock(return_value=(changes, "new-token", None)), + ) + monkeypatch.setattr(_mod, "categorize_change", lambda change: "modified") + monkeypatch.setattr( + _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) + ) + + download_mock = AsyncMock(return_value=([], 0)) + monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) + + batch_mock = AsyncMock(return_value=([], 2, 0)) + pipeline_mock = MagicMock() + pipeline_mock.index_batch_parallel = batch_mock + pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock) + ) + monkeypatch.setattr( + _mod, "get_user_long_context_llm", AsyncMock(return_value=MagicMock()) + ) + + mock_task_logger = MagicMock() + mock_task_logger.log_task_progress = AsyncMock() + + _indexed, skipped = await _mod._index_with_delta_sync( + MagicMock(), session, MagicMock(), + _CONNECTOR_ID, _SEARCH_SPACE_ID, _USER_ID, + "folder-root", "start-token", + mock_task_logger, MagicMock(), + max_files=500, enable_summary=True, + ) + + call_files = download_mock.call_args[0][1] + assert len(call_files) == 2 + assert skipped == 3 + + +# =================================================================== +# C) OneDrive smoke tests — verify page limit wiring +# =================================================================== + + +def _make_onedrive_file(file_id: str, name: str, size: int = 80 * 1024) -> dict: + return { + "id": file_id, + "name": name, + "file": {"mimeType": "application/octet-stream"}, + "size": str(size), + "lastModifiedDateTime": "2026-01-01T00:00:00Z", + } + + +@pytest.fixture +def onedrive_selected_mocks(monkeypatch): + import app.tasks.connector_indexers.onedrive_indexer as _mod + + session, fake_user = _make_page_limit_session(0, 100) + + get_file_results: dict[str, tuple[dict | None, str | None]] = {} + + async def _fake_get_file(client, file_id): + return get_file_results.get(file_id, (None, f"Not found: {file_id}")) + + monkeypatch.setattr(_mod, "get_file_by_id", _fake_get_file) + monkeypatch.setattr( + _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) + ) + + download_and_index_mock = AsyncMock(return_value=(0, 0)) + monkeypatch.setattr(_mod, "_download_and_index", download_and_index_mock) + + pipeline_mock = MagicMock() + pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock) + ) + + return { + "session": session, + "fake_user": fake_user, + "get_file_results": get_file_results, + "download_and_index_mock": download_and_index_mock, + } + + +async def _run_onedrive_selected(mocks, file_ids): + from app.tasks.connector_indexers.onedrive_indexer import _index_selected_files + + return await _index_selected_files( + MagicMock(), mocks["session"], file_ids, + connector_id=_CONNECTOR_ID, search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, enable_summary=True, + ) + + +async def test_onedrive_over_quota_rejected(onedrive_selected_mocks): + """OneDrive: files exceeding quota produce errors, not downloads.""" + m = onedrive_selected_mocks + m["fake_user"].pages_used = 99 + m["fake_user"].pages_limit = 100 + + m["get_file_results"]["big"] = ( + _make_onedrive_file("big", "huge.pdf", size=500 * 1024), None, + ) + + indexed, _skipped, errors = await _run_onedrive_selected(m, [("big", "huge.pdf")]) + + assert indexed == 0 + assert len(errors) == 1 + assert "page limit" in errors[0].lower() + + +async def test_onedrive_deducts_after_success(onedrive_selected_mocks): + """OneDrive: pages_used increases after successful indexing.""" + m = onedrive_selected_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 100 + + for fid in ("f1", "f2"): + m["get_file_results"][fid] = ( + _make_onedrive_file(fid, f"{fid}.xyz", size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (2, 0) + + await _run_onedrive_selected(m, [("f1", "f1.xyz"), ("f2", "f2.xyz")]) + + assert m["fake_user"].pages_used == 2 + + +# =================================================================== +# D) Dropbox smoke tests — verify page limit wiring +# =================================================================== + + +def _make_dropbox_file(file_path: str, name: str, size: int = 80 * 1024) -> dict: + return { + "id": f"id:{file_path}", + "name": name, + ".tag": "file", + "path_lower": file_path, + "size": str(size), + "server_modified": "2026-01-01T00:00:00Z", + "content_hash": f"hash_{name}", + } + + +@pytest.fixture +def dropbox_selected_mocks(monkeypatch): + import app.tasks.connector_indexers.dropbox_indexer as _mod + + session, fake_user = _make_page_limit_session(0, 100) + + get_file_results: dict[str, tuple[dict | None, str | None]] = {} + + async def _fake_get_file(client, file_path): + return get_file_results.get(file_path, (None, f"Not found: {file_path}")) + + monkeypatch.setattr(_mod, "get_file_by_path", _fake_get_file) + monkeypatch.setattr( + _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) + ) + + download_and_index_mock = AsyncMock(return_value=(0, 0)) + monkeypatch.setattr(_mod, "_download_and_index", download_and_index_mock) + + pipeline_mock = MagicMock() + pipeline_mock.create_placeholder_documents = AsyncMock(return_value=0) + monkeypatch.setattr( + _mod, "IndexingPipelineService", MagicMock(return_value=pipeline_mock) + ) + + return { + "session": session, + "fake_user": fake_user, + "get_file_results": get_file_results, + "download_and_index_mock": download_and_index_mock, + } + + +async def _run_dropbox_selected(mocks, file_paths): + from app.tasks.connector_indexers.dropbox_indexer import _index_selected_files + + return await _index_selected_files( + MagicMock(), mocks["session"], file_paths, + connector_id=_CONNECTOR_ID, search_space_id=_SEARCH_SPACE_ID, + user_id=_USER_ID, enable_summary=True, + ) + + +async def test_dropbox_over_quota_rejected(dropbox_selected_mocks): + """Dropbox: files exceeding quota produce errors, not downloads.""" + m = dropbox_selected_mocks + m["fake_user"].pages_used = 99 + m["fake_user"].pages_limit = 100 + + m["get_file_results"]["/huge.pdf"] = ( + _make_dropbox_file("/huge.pdf", "huge.pdf", size=500 * 1024), None, + ) + + indexed, _skipped, errors = await _run_dropbox_selected( + m, [("/huge.pdf", "huge.pdf")] + ) + + assert indexed == 0 + assert len(errors) == 1 + assert "page limit" in errors[0].lower() + + +async def test_dropbox_deducts_after_success(dropbox_selected_mocks): + """Dropbox: pages_used increases after successful indexing.""" + m = dropbox_selected_mocks + m["fake_user"].pages_used = 0 + m["fake_user"].pages_limit = 100 + + for name in ("f1.xyz", "f2.xyz"): + path = f"/{name}" + m["get_file_results"][path] = ( + _make_dropbox_file(path, name, size=80 * 1024), None, + ) + m["download_and_index_mock"].return_value = (2, 0) + + await _run_dropbox_selected(m, [("/f1.xyz", "f1.xyz"), ("/f2.xyz", "f2.xyz")]) + + assert m["fake_user"].pages_used == 2