diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 58c9f5003..acfbce0bf 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -34,6 +34,7 @@ from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm +from app.services.page_limit_service import PageLimitExceededError, PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.celery_tasks import get_celery_session_maker from app.utils.document_versioning import create_version_snapshot @@ -171,6 +172,39 @@ def _needs_etl(filename: str) -> bool: HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +def _estimate_pages_safe(page_limit_service: PageLimitService, file_path: str) -> int: + """Estimate page count with a file-size fallback.""" + try: + return page_limit_service.estimate_pages_before_processing(file_path) + except Exception: + file_size = os.path.getsize(file_path) + return max(1, file_size // (80 * 1024)) + + +async def _check_page_limit_or_skip( + page_limit_service: PageLimitService, + user_id: str, + file_path: str, +) -> int: + """Estimate pages and check the limit; raises PageLimitExceededError if over quota. + + Returns the estimated page count on success. + """ + estimated = _estimate_pages_safe(page_limit_service, file_path) + await page_limit_service.check_page_limit(user_id, estimated) + return estimated + + +def _compute_final_pages( + page_limit_service: PageLimitService, + estimated_pages: int, + content_length: int, +) -> int: + """Return the final page count as max(estimated, actual).""" + actual = page_limit_service.estimate_pages_from_content_length(content_length) + return max(estimated_pages, actual) + DEFAULT_EXCLUDE_PATTERNS = [ ".git", "node_modules", @@ -720,11 +754,12 @@ async def index_local_folder( skipped_count = 0 failed_count = 0 + page_limit_service = PageLimitService(session) + # ================================================================ # PHASE 1: Pre-filter files (mtime / content-hash), version changed # ================================================================ connector_docs: list[ConnectorDocument] = [] - # Maps unique_id -> (relative_path, mtime) for post-pipeline folder_id assignment file_meta_map: dict[str, dict] = {} seen_unique_hashes: set[str] = set() @@ -759,6 +794,17 @@ async def index_local_folder( skipped_count += 1 continue + try: + estimated_pages = await _check_page_limit_or_skip( + page_limit_service, user_id, file_path_abs + ) + except PageLimitExceededError: + logger.warning( + f"Page limit exceeded, skipping: {file_path_abs}" + ) + failed_count += 1 + continue + try: content, content_hash = await _compute_file_content_hash( file_path_abs, file_info["relative_path"], search_space_id @@ -781,6 +827,17 @@ async def index_local_folder( await create_version_snapshot(session, existing_document) else: + try: + estimated_pages = await _check_page_limit_or_skip( + page_limit_service, user_id, file_path_abs + ) + except PageLimitExceededError: + logger.warning( + f"Page limit exceeded, skipping: {file_path_abs}" + ) + failed_count += 1 + continue + try: content, content_hash = await _compute_file_content_hash( file_path_abs, file_info["relative_path"], search_space_id @@ -807,6 +864,8 @@ async def index_local_folder( file_meta_map[unique_identifier] = { "relative_path": relative_path, "mtime": file_info["modified_at"].timestamp(), + "estimated_pages": estimated_pages, + "content_length": len(content), } except Exception as e: @@ -901,6 +960,15 @@ async def index_local_folder( doc_meta = dict(result.document_metadata or {}) doc_meta["mtime"] = mtime_info.get("mtime") result.document_metadata = doc_meta + + est = mtime_info.get("estimated_pages", 1) + content_len = mtime_info.get("content_length", 0) + final_pages = _compute_final_pages( + page_limit_service, est, content_len + ) + await page_limit_service.update_page_usage( + user_id, final_pages, allow_exceed=True + ) else: failed_count += 1 @@ -1084,6 +1152,14 @@ async def _index_single_file( DocumentType.LOCAL_FOLDER_FILE.value, unique_id, search_space_id ) + page_limit_service = PageLimitService(session) + try: + estimated_pages = await _check_page_limit_or_skip( + page_limit_service, user_id, str(full_path) + ) + except PageLimitExceededError as e: + return 0, 1, f"Page limit exceeded: {e}" + try: content, content_hash = await _compute_file_content_hash( str(full_path), full_path.name, search_space_id @@ -1128,8 +1204,6 @@ async def _index_single_file( db_doc = documents[0] - # Assign folder_id before indexing so the doc appears in the - # correct folder while still pending/processing. if root_folder_id: try: db_doc.folder_id = await _resolve_folder_for_file( @@ -1154,10 +1228,16 @@ async def _index_single_file( failed_msg = None if indexed else "Indexing failed" if indexed: + final_pages = _compute_final_pages( + page_limit_service, estimated_pages, len(content) + ) + await page_limit_service.update_page_usage( + user_id, final_pages, allow_exceed=True + ) await task_logger.log_task_success( log_entry, f"Single file indexed: {rel_path}", - {"file": rel_path}, + {"file": rel_path, "pages_processed": final_pages}, ) return indexed, 0 if indexed else 1, failed_msg diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py index b94762ee6..4d9bda7ee 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py @@ -959,3 +959,222 @@ class TestDirectConvert: assert "| name" in doc.source_markdown assert "name,age,city" not in doc.source_markdown + + +# ==================================================================== +# Tier 8: Page Limits (PL1-PL6) +# ==================================================================== + + +class TestPageLimits: + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl1_full_scan_increments_pages_used( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + ): + """PL1: Successful full-scan sync increments user.pages_used.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 0 + db_user.pages_limit = 500 + await db_session.flush() + + (tmp_path / "note.md").write_text("# Hello World\n\nContent here.") + + count, _skipped, _root_folder_id, err = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + ) + + assert err is None + assert count == 1 + + await db_session.refresh(db_user) + assert db_user.pages_used > 0, "pages_used should increase after indexing" + + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl2_full_scan_blocked_when_limit_exhausted( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + ): + """PL2: Full-scan skips file when page limit is exhausted.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 100 + db_user.pages_limit = 100 + await db_session.flush() + + (tmp_path / "note.md").write_text("# Hello World\n\nContent here.") + + count, _skipped, _root_folder_id, err = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + ) + + assert count == 0 + + await db_session.refresh(db_user) + assert db_user.pages_used == 100, "pages_used should not change on rejection" + + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl3_single_file_increments_pages_used( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + ): + """PL3: Single-file mode increments user.pages_used on success.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 0 + db_user.pages_limit = 500 + await db_session.flush() + + (tmp_path / "note.md").write_text("# Hello World\n\nContent here.") + + count, _skipped, _root_folder_id, err = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + target_file_paths=[str(tmp_path / "note.md")], + ) + + assert err is None + assert count == 1 + + await db_session.refresh(db_user) + assert db_user.pages_used > 0, "pages_used should increase after indexing" + + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl4_single_file_blocked_when_limit_exhausted( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + ): + """PL4: Single-file mode skips file when page limit is exhausted.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 100 + db_user.pages_limit = 100 + await db_session.flush() + + (tmp_path / "note.md").write_text("# Hello World\n\nContent here.") + + count, _skipped, _root_folder_id, err = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + target_file_paths=[str(tmp_path / "note.md")], + ) + + assert count == 0 + assert err is not None + assert "page limit" in err.lower() + + await db_session.refresh(db_user) + assert db_user.pages_used == 100, "pages_used should not change on rejection" + + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl5_unchanged_resync_no_extra_pages( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + ): + """PL5: Re-syncing an unchanged file does not consume additional pages.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 0 + db_user.pages_limit = 500 + await db_session.flush() + + (tmp_path / "note.md").write_text("# Hello\n\nSame content.") + + count1, _, root_folder_id, _ = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + ) + assert count1 == 1 + + await db_session.refresh(db_user) + pages_after_first = db_user.pages_used + assert pages_after_first > 0 + + count2, _, _, _ = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + root_folder_id=root_folder_id, + ) + assert count2 == 0 + + await db_session.refresh(db_user) + assert db_user.pages_used == pages_after_first, ( + "pages_used should not increase for unchanged files" + ) + + @pytest.mark.usefixtures(*UNIFIED_FIXTURES) + async def test_pl6_batch_partial_page_limit_exhaustion( + self, + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, + tmp_path: Path, + patched_batch_sessions, + ): + """PL6: Batch mode with a very low page limit: some files succeed, rest fail.""" + from app.tasks.connector_indexers.local_folder_indexer import index_local_folder + + db_user.pages_used = 0 + db_user.pages_limit = 1 + await db_session.flush() + + (tmp_path / "a.md").write_text("File A content") + (tmp_path / "b.md").write_text("File B content") + (tmp_path / "c.md").write_text("File C content") + + count, failed, _root_folder_id, _err = await index_local_folder( + session=db_session, + search_space_id=db_search_space.id, + user_id=str(db_user.id), + folder_path=str(tmp_path), + folder_name="test-folder", + target_file_paths=[ + str(tmp_path / "a.md"), + str(tmp_path / "b.md"), + str(tmp_path / "c.md"), + ], + ) + + assert count >= 1, "at least one file should succeed" + assert failed >= 1, "at least one file should fail due to page limit" + assert count + failed == 3 + + await db_session.refresh(db_user) + assert db_user.pages_used > 0 + assert db_user.pages_used <= db_user.pages_limit + 1