diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index edb01d4cc..e6eed7836 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -1305,11 +1305,11 @@ class FolderIndexRequest(PydanticBaseModel): enable_summary: bool = False -class FolderIndexFileRequest(PydanticBaseModel): +class FolderIndexFilesRequest(PydanticBaseModel): folder_path: str folder_name: str search_space_id: int - target_file_path: str + target_file_paths: list[str] root_folder_id: int | None = None enable_summary: bool = False @@ -1393,14 +1393,15 @@ async def folder_index( } -@router.post("/documents/folder-index-file") -async def folder_index_file( - request: FolderIndexFileRequest, +@router.post("/documents/folder-index-files") +async def folder_index_files( + request: FolderIndexFilesRequest, session: AsyncSession = Depends(get_async_session), user: User = Depends(current_active_user), ): - """Index a single file within a watched folder (chokidar trigger). - Validates that target_file_path is under folder_path. + """Index multiple files within a watched folder (batched chokidar trigger). + Validates that all target_file_paths are under folder_path. + Dispatches a single Celery task that processes them in parallel. """ from app.config import config as app_config @@ -1410,6 +1411,9 @@ async def folder_index_file( detail="Local folder indexing is only available in self-hosted mode", ) + if not request.target_file_paths: + raise HTTPException(status_code=400, detail="target_file_paths must not be empty") + await check_permission( session, user, @@ -1420,13 +1424,14 @@ async def folder_index_file( from pathlib import Path - try: - Path(request.target_file_path).relative_to(request.folder_path) - except ValueError: - raise HTTPException( - status_code=400, - detail="target_file_path must be inside folder_path", - ) + for fp in request.target_file_paths: + try: + Path(fp).relative_to(request.folder_path) + except ValueError: + raise HTTPException( + status_code=400, + detail=f"target_file_path {fp} must be inside folder_path", + ) from app.tasks.celery_tasks.document_tasks import index_local_folder_task @@ -1435,14 +1440,15 @@ async def folder_index_file( user_id=str(user.id), folder_path=request.folder_path, folder_name=request.folder_name, - target_file_path=request.target_file_path, + target_file_paths=request.target_file_paths, root_folder_id=request.root_folder_id, enable_summary=request.enable_summary, ) return { - "message": "File indexing started", + "message": f"Batch indexing started for {len(request.target_file_paths)} file(s)", "status": "processing", + "file_count": len(request.target_file_paths), } diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 4701d9911..16ac50967 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -1275,7 +1275,7 @@ def index_local_folder_task( file_extensions: list[str] | None = None, root_folder_id: int | None = None, enable_summary: bool = False, - target_file_path: str | None = None, + target_file_paths: list[str] | None = None, ): """Celery task to index a local folder. Config is passed directly — no connector row.""" loop = asyncio.new_event_loop() @@ -1292,7 +1292,7 @@ def index_local_folder_task( file_extensions=file_extensions, root_folder_id=root_folder_id, enable_summary=enable_summary, - target_file_path=target_file_path, + target_file_paths=target_file_paths, ) ) finally: @@ -1308,19 +1308,103 @@ async def _index_local_folder_async( file_extensions: list[str] | None = None, root_folder_id: int | None = None, enable_summary: bool = False, - target_file_path: str | None = None, + target_file_paths: list[str] | None = None, ): - """Run local folder indexing with a fresh DB session.""" + """Run local folder indexing with notification + heartbeat.""" + is_batch = bool(target_file_paths) + is_full_scan = not target_file_paths + file_count = len(target_file_paths) if target_file_paths else None + + if is_batch: + doc_name = f"{folder_name} ({file_count} file{'s' if file_count != 1 else ''})" + else: + doc_name = folder_name + + notification = None + heartbeat_task = None + async with get_celery_session_maker()() as session: - await index_local_folder( - session=session, - search_space_id=search_space_id, - user_id=user_id, - folder_path=folder_path, - folder_name=folder_name, - exclude_patterns=exclude_patterns, - file_extensions=file_extensions, - root_folder_id=root_folder_id, - enable_summary=enable_summary, - target_file_path=target_file_path, - ) + try: + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="LOCAL_FOLDER_FILE", + document_name=doc_name, + search_space_id=search_space_id, + ) + ) + _start_heartbeat(notification.id) + heartbeat_task = asyncio.create_task( + _run_heartbeat_loop(notification.id) + ) + except Exception: + logger.warning( + "Failed to create notification for local folder indexing", + exc_info=True, + ) + + async def _heartbeat_progress(completed_count: int) -> None: + """Refresh heartbeat and optionally update notification progress.""" + if notification: + try: + await NotificationService.document_processing.notify_processing_progress( + session=session, + notification=notification, + stage="indexing", + stage_message=f"Syncing files ({completed_count}/{file_count or '?'})", + ) + except Exception: + pass + + try: + indexed, skipped_or_failed, _rfid, err = await index_local_folder( + session=session, + search_space_id=search_space_id, + user_id=user_id, + folder_path=folder_path, + folder_name=folder_name, + exclude_patterns=exclude_patterns, + file_extensions=file_extensions, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + target_file_paths=target_file_paths, + on_heartbeat_callback=_heartbeat_progress if (is_batch or is_full_scan) else None, + ) + + if notification: + try: + if err: + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=err, + ) + else: + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + ) + except Exception: + logger.warning( + "Failed to update notification after local folder indexing", + exc_info=True, + ) + + except Exception as e: + logger.exception(f"Local folder indexing failed: {e}") + if notification: + try: + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:200], + ) + except Exception: + pass + raise + finally: + if heartbeat_task: + heartbeat_task.cancel() + if notification: + _stop_heartbeat(notification.id) diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 7f6a35d7f..4ac8cc594 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -3,7 +3,7 @@ Local folder indexer. Indexes files from a local folder on disk. Supports: - Full-scan mode (startup reconciliation / manual trigger) -- Single-file mode (chokidar real-time trigger) +- Batch mode (chokidar real-time trigger, 1..N files) - Filesystem folder structure mirroring into DB Folder rows - Document versioning via create_version_snapshot - ETL-based file parsing for binary formats (PDF, DOCX, images, audio, etc.) @@ -13,6 +13,7 @@ Config (folder_path, exclude_patterns, etc.) is passed in from the caller — no connector row is read. """ +import asyncio import os from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -34,6 +35,7 @@ from app.indexing_pipeline.document_hashing import compute_identifier_hash from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService +from app.tasks.celery_tasks import get_celery_session_maker from app.utils.document_versioning import create_version_snapshot from .base import ( @@ -497,14 +499,15 @@ async def index_local_folder( file_extensions: list[str] | None = None, root_folder_id: int | None = None, enable_summary: bool = False, - target_file_path: str | None = None, + target_file_paths: list[str] | None = None, on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, int | None, str | None]: """Index files from a local folder. Supports two modes: - - Full scan (target_file_path=None): walks entire folder, handles new/changed/deleted files. - - Single-file (target_file_path set): processes only that file. + - Batch (target_file_paths set): processes 1..N files. + Single-file uses the caller's session; multi-file fans out with per-file sessions. + - Full scan (no target paths): walks entire folder, handles new/changed/deleted files. Returns (indexed_count, skipped_count, root_folder_id, error_or_warning_message). """ @@ -517,7 +520,7 @@ async def index_local_folder( metadata={ "folder_path": folder_path, "user_id": str(user_id), - "target_file_path": target_file_path, + "target_file_paths_count": len(target_file_paths) if target_file_paths else None, }, ) @@ -535,22 +538,47 @@ async def index_local_folder( exclude_patterns = DEFAULT_EXCLUDE_PATTERNS # ==================================================================== - # SINGLE-FILE MODE + # BATCH MODE (1..N files) # ==================================================================== - if target_file_path: - indexed, skipped, err = await _index_single_file( - session=session, + if target_file_paths: + if len(target_file_paths) == 1: + indexed, skipped, err = await _index_single_file( + session=session, + search_space_id=search_space_id, + user_id=user_id, + folder_path=folder_path, + folder_name=folder_name, + target_file_path=target_file_paths[0], + enable_summary=enable_summary, + root_folder_id=root_folder_id, + task_logger=task_logger, + log_entry=log_entry, + ) + return indexed, skipped, root_folder_id, err + + indexed, failed, err = await _index_batch_files( search_space_id=search_space_id, user_id=user_id, folder_path=folder_path, folder_name=folder_name, - target_file_path=target_file_path, + target_file_paths=target_file_paths, enable_summary=enable_summary, root_folder_id=root_folder_id, - task_logger=task_logger, - log_entry=log_entry, + on_progress_callback=on_heartbeat_callback, ) - return indexed, skipped, root_folder_id, err + if err: + await task_logger.log_task_success( + log_entry, + f"Batch indexing: {indexed} indexed, {failed} failed", + {"indexed": indexed, "failed": failed}, + ) + else: + await task_logger.log_task_success( + log_entry, + f"Batch indexing complete: {indexed} indexed", + {"indexed": indexed, "failed": failed}, + ) + return indexed, failed, root_folder_id, err # ==================================================================== # FULL-SCAN MODE @@ -822,6 +850,84 @@ async def index_local_folder( return 0, 0, root_folder_id, str(e) +BATCH_CONCURRENCY = 5 + + +async def _index_batch_files( + search_space_id: int, + user_id: str, + folder_path: str, + folder_name: str, + target_file_paths: list[str], + enable_summary: bool, + root_folder_id: int | None, + on_progress_callback: HeartbeatCallbackType | None = None, +) -> tuple[int, int, str | None]: + """Process multiple files in parallel with bounded concurrency. + + Each file gets its own DB session so they can run concurrently. + Returns (indexed_count, failed_count, error_summary_or_none). + """ + semaphore = asyncio.Semaphore(BATCH_CONCURRENCY) + indexed = 0 + failed = 0 + errors: list[str] = [] + lock = asyncio.Lock() + completed = 0 + + async def process_one(file_path: str) -> None: + nonlocal indexed, failed, completed + async with semaphore: + try: + async with get_celery_session_maker()() as file_session: + task_logger = TaskLoggingService(file_session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="local_folder_indexing", + source="local_folder_batch_indexing", + message=f"Batch: indexing {Path(file_path).name}", + metadata={"file_path": file_path}, + ) + ix, _sk, err = await _index_single_file( + session=file_session, + search_space_id=search_space_id, + user_id=user_id, + folder_path=folder_path, + folder_name=folder_name, + target_file_path=file_path, + enable_summary=enable_summary, + root_folder_id=root_folder_id, + task_logger=task_logger, + log_entry=log_entry, + ) + async with lock: + indexed += ix + if err: + failed += 1 + errors.append(f"{Path(file_path).name}: {err}") + completed += 1 + if on_progress_callback and completed % BATCH_CONCURRENCY == 0: + await on_progress_callback(completed) + except Exception as exc: + logger.exception(f"Batch: error processing {file_path}: {exc}") + async with lock: + failed += 1 + completed += 1 + errors.append(f"{Path(file_path).name}: {exc}") + + await asyncio.gather(*[process_one(fp) for fp in target_file_paths]) + + if on_progress_callback: + await on_progress_callback(completed) + + error_summary = None + if errors: + error_summary = f"{failed} file(s) failed: " + "; ".join(errors[:5]) + if len(errors) > 5: + error_summary += f" ... and {len(errors) - 5} more" + + return indexed, failed, error_summary + + async def _index_single_file( session: AsyncSession, search_space_id: int, diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py index 154cc6e0e..6cc5655c4 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_local_folder_pipeline.py @@ -215,7 +215,7 @@ class TestFullIndexer: db_search_space: SearchSpace, tmp_path: Path, ): - """I5: Single-file mode only processes the specified file.""" + """I5: Batch mode with a single file only processes that file.""" from app.tasks.connector_indexers.local_folder_indexer import index_local_folder (tmp_path / "a.md").write_text("File A") @@ -228,7 +228,7 @@ class TestFullIndexer: user_id=str(db_user.id), folder_path=str(tmp_path), folder_name="test-folder", - target_file_path=str(tmp_path / "b.md"), + target_file_paths=[str(tmp_path / "b.md")], ) assert count == 1 @@ -507,7 +507,7 @@ class TestFolderMirroring: user_id=str(db_user.id), folder_path=str(tmp_path), folder_name="test-folder", - target_file_path=str(sub / "new.md"), + target_file_paths=[str(sub / "new.md")], root_folder_id=root_folder_id, ) assert count == 1 @@ -546,7 +546,7 @@ class TestFolderMirroring: db_search_space: SearchSpace, tmp_path: Path, ): - """F7: Deleting the only file in a subfolder via single-file mode removes empty Folder rows.""" + """F7: Deleting the only file in a subfolder via batch mode removes empty Folder rows.""" from app.tasks.connector_indexers.local_folder_indexer import index_local_folder sub = tmp_path / "notes" / "ephemeral" @@ -578,7 +578,7 @@ class TestFolderMirroring: user_id=str(db_user.id), folder_path=str(tmp_path), folder_name="test-folder", - target_file_path=str(target), + target_file_paths=[str(target)], root_folder_id=root_folder_id, ) diff --git a/surfsense_web/hooks/use-folder-sync.ts b/surfsense_web/hooks/use-folder-sync.ts index 59c061afb..a64add593 100644 --- a/surfsense_web/hooks/use-folder-sync.ts +++ b/surfsense_web/hooks/use-folder-sync.ts @@ -16,65 +16,82 @@ interface FileChangedEvent { } const DEBOUNCE_MS = 2000; -interface QueueItem { - event: FileChangedEvent; + +interface BatchItem { + folderPath: string; + folderName: string; + searchSpaceId: number; + rootFolderId: number | null; + filePaths: string[]; ackIds: string[]; } export function useFolderSync() { - const queueRef = useRef([]); + const queueRef = useRef([]); const processingRef = useRef(false); const debounceTimers = useRef>>(new Map()); - const pendingByKey = useRef>(new Map()); + const pendingByFolder = useRef>(new Map()); const isMountedRef = useRef(false); async function processQueue() { if (processingRef.current) return; processingRef.current = true; while (queueRef.current.length > 0) { - const item = queueRef.current.shift()!; + const batch = queueRef.current.shift()!; try { - await documentsApiService.folderIndexFile(item.event.searchSpaceId, { - folder_path: item.event.folderPath, - folder_name: item.event.folderName, - search_space_id: item.event.searchSpaceId, - target_file_path: item.event.fullPath, - root_folder_id: item.event.rootFolderId, + await documentsApiService.folderIndexFiles(batch.searchSpaceId, { + folder_path: batch.folderPath, + folder_name: batch.folderName, + search_space_id: batch.searchSpaceId, + target_file_paths: batch.filePaths, + root_folder_id: batch.rootFolderId, }); const api = typeof window !== "undefined" ? window.electronAPI : null; - if (api?.acknowledgeFileEvents && item.ackIds.length > 0) { - await api.acknowledgeFileEvents(item.ackIds); + if (api?.acknowledgeFileEvents && batch.ackIds.length > 0) { + await api.acknowledgeFileEvents(batch.ackIds); } } catch (err) { - console.error("[FolderSync] Failed to trigger re-index:", err); + console.error("[FolderSync] Failed to trigger batch re-index:", err); } } processingRef.current = false; } function enqueueWithDebounce(event: FileChangedEvent) { - const key = `${event.folderPath}:${event.relativePath}`; - const existing = pendingByKey.current.get(key); - const ackSet = new Set(existing?.ackIds ?? []); - ackSet.add(event.id); - pendingByKey.current.set(key, { - event, - ackIds: Array.from(ackSet), - }); + const folderKey = event.folderPath; + const existing = pendingByFolder.current.get(folderKey); - const existingTimeout = debounceTimers.current.get(key); + if (existing) { + const pathSet = new Set(existing.filePaths); + pathSet.add(event.fullPath); + existing.filePaths = Array.from(pathSet); + if (!existing.ackIds.includes(event.id)) { + existing.ackIds.push(event.id); + } + } else { + pendingByFolder.current.set(folderKey, { + folderPath: event.folderPath, + folderName: event.folderName, + searchSpaceId: event.searchSpaceId, + rootFolderId: event.rootFolderId, + filePaths: [event.fullPath], + ackIds: [event.id], + }); + } + + const existingTimeout = debounceTimers.current.get(folderKey); if (existingTimeout) clearTimeout(existingTimeout); const timeout = setTimeout(() => { - debounceTimers.current.delete(key); - const pending = pendingByKey.current.get(key); + debounceTimers.current.delete(folderKey); + const pending = pendingByFolder.current.get(folderKey); if (!pending) return; - pendingByKey.current.delete(key); + pendingByFolder.current.delete(folderKey); queueRef.current.push(pending); processQueue(); }, DEBOUNCE_MS); - debounceTimers.current.set(key, timeout); + debounceTimers.current.set(folderKey, timeout); } useEffect(() => { @@ -108,7 +125,7 @@ export function useFolderSync() { clearTimeout(timeout); } debounceTimers.current.clear(); - pendingByKey.current.clear(); + pendingByFolder.current.clear(); }; }, []); } diff --git a/surfsense_web/lib/apis/documents-api.service.ts b/surfsense_web/lib/apis/documents-api.service.ts index e8b228d03..45d9f6dc8 100644 --- a/surfsense_web/lib/apis/documents-api.service.ts +++ b/surfsense_web/lib/apis/documents-api.service.ts @@ -400,8 +400,8 @@ class DocumentsApiService { return baseApiService.post(`/api/v1/documents/folder-index`, undefined, { body }); }; - folderIndexFile = async (searchSpaceId: number, body: { folder_path: string; folder_name: string; search_space_id: number; target_file_path: string; root_folder_id?: number | null; enable_summary?: boolean }) => { - return baseApiService.post(`/api/v1/documents/folder-index-file`, undefined, { body }); + folderIndexFiles = async (searchSpaceId: number, body: { folder_path: string; folder_name: string; search_space_id: number; target_file_paths: string[]; root_folder_id?: number | null; enable_summary?: boolean }) => { + return baseApiService.post(`/api/v1/documents/folder-index-files`, undefined, { body }); }; getWatchedFolders = async (searchSpaceId: number) => {