diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 5008b1a10..c28fddfe0 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -1543,3 +1543,379 @@ async def folder_index_files( "status": "processing", "file_count": len(request.target_file_paths), } + + +# ===== Upload-based local folder indexing endpoints ===== +# These work for ALL deployment modes (cloud, self-hosted remote, self-hosted local). +# The desktop app reads files locally and uploads them here. + + +class FolderMtimeCheckFile(PydanticBaseModel): + relative_path: str + mtime: float + + +class FolderMtimeCheckRequest(PydanticBaseModel): + folder_name: str + search_space_id: int + files: list[FolderMtimeCheckFile] + + +class FolderUnlinkRequest(PydanticBaseModel): + folder_name: str + search_space_id: int + root_folder_id: int | None = None + relative_paths: list[str] + + +class FolderSyncFinalizeRequest(PydanticBaseModel): + folder_name: str + search_space_id: int + root_folder_id: int | None = None + all_relative_paths: list[str] + + +@router.post("/documents/folder-mtime-check") +async def folder_mtime_check( + request: FolderMtimeCheckRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Pre-upload optimization: check which files need uploading based on mtime. + + Returns the subset of relative paths where the file is new or has a + different mtime, so the client can skip reading/uploading unchanged files. + """ + from app.indexing_pipeline.document_hashing import compute_identifier_hash + + await check_permission( + session, + user, + request.search_space_id, + Permission.DOCUMENTS_CREATE.value, + "You don't have permission to create documents in this search space", + ) + + uid_hashes = {} + for f in request.files: + uid = f"{request.folder_name}:{f.relative_path}" + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, uid, request.search_space_id + ) + uid_hashes[uid_hash] = f + + existing_docs = ( + ( + await session.execute( + select(Document).where( + Document.unique_identifier_hash.in_(list(uid_hashes.keys())), + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + ) + ) + ) + .scalars() + .all() + ) + + existing_by_hash = {doc.unique_identifier_hash: doc for doc in existing_docs} + + MTIME_TOLERANCE = 1.0 + files_to_upload: list[str] = [] + + for uid_hash, file_info in uid_hashes.items(): + doc = existing_by_hash.get(uid_hash) + if doc is None: + files_to_upload.append(file_info.relative_path) + continue + + stored_mtime = (doc.document_metadata or {}).get("mtime") + if stored_mtime is None: + files_to_upload.append(file_info.relative_path) + continue + + if abs(file_info.mtime - stored_mtime) >= MTIME_TOLERANCE: + files_to_upload.append(file_info.relative_path) + + return {"files_to_upload": files_to_upload} + + +@router.post("/documents/folder-upload") +async def folder_upload( + files: list[UploadFile], + folder_name: str = Form(...), + search_space_id: int = Form(...), + relative_paths: str = Form(...), + root_folder_id: int | None = Form(None), + enable_summary: bool = Form(False), + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Upload files from the desktop app for folder indexing. + + Files are written to temp storage and dispatched to a Celery task. + Works for all deployment modes (no is_self_hosted guard). + """ + import json + import tempfile + + await check_permission( + session, + user, + search_space_id, + Permission.DOCUMENTS_CREATE.value, + "You don't have permission to create documents in this search space", + ) + + if not files: + raise HTTPException(status_code=400, detail="No files provided") + + try: + rel_paths: list[str] = json.loads(relative_paths) + except (json.JSONDecodeError, TypeError) as e: + raise HTTPException( + status_code=400, detail=f"Invalid relative_paths JSON: {e}" + ) from e + + if len(rel_paths) != len(files): + raise HTTPException( + status_code=400, + detail=f"Mismatch: {len(files)} files but {len(rel_paths)} relative_paths", + ) + + for file in files: + file_size = file.size or 0 + if file_size > MAX_FILE_SIZE_BYTES: + raise HTTPException( + status_code=413, + detail=f"File '{file.filename}' ({file_size / (1024 * 1024):.1f} MB) " + f"exceeds the {MAX_FILE_SIZE_BYTES // (1024 * 1024)} MB per-file limit.", + ) + + if not root_folder_id: + watched_metadata = { + "watched": True, + "folder_path": folder_name, + } + existing_root = ( + await session.execute( + select(Folder).where( + Folder.name == folder_name, + Folder.parent_id.is_(None), + Folder.search_space_id == search_space_id, + ) + ) + ).scalar_one_or_none() + + if existing_root: + root_folder_id = existing_root.id + existing_root.folder_metadata = watched_metadata + else: + root_folder = Folder( + name=folder_name, + search_space_id=search_space_id, + created_by_id=str(user.id), + position="a0", + folder_metadata=watched_metadata, + ) + session.add(root_folder) + await session.flush() + root_folder_id = root_folder.id + + await session.commit() + + async def _read_and_save(file: UploadFile, idx: int) -> dict: + content = await file.read() + filename = file.filename or rel_paths[idx].split("/")[-1] + + def _write_temp() -> str: + with tempfile.NamedTemporaryFile( + delete=False, suffix=os.path.splitext(filename)[1] + ) as tmp: + tmp.write(content) + return tmp.name + + temp_path = await asyncio.to_thread(_write_temp) + return { + "temp_path": temp_path, + "relative_path": rel_paths[idx], + "filename": filename, + } + + file_mappings = await asyncio.gather( + *(_read_and_save(f, i) for i, f in enumerate(files)) + ) + + from app.tasks.celery_tasks.document_tasks import ( + index_uploaded_folder_files_task, + ) + + index_uploaded_folder_files_task.delay( + search_space_id=search_space_id, + user_id=str(user.id), + folder_name=folder_name, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + file_mappings=list(file_mappings), + ) + + return { + "message": f"Folder upload started for {len(files)} file(s)", + "status": "processing", + "root_folder_id": root_folder_id, + "file_count": len(files), + } + + +@router.post("/documents/folder-unlink") +async def folder_unlink( + request: FolderUnlinkRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Handle file deletion events from the desktop watcher. + + For each relative path, find the matching document and delete it. + """ + from app.indexing_pipeline.document_hashing import compute_identifier_hash + from app.tasks.connector_indexers.local_folder_indexer import ( + _cleanup_empty_folder_chain, + ) + + await check_permission( + session, + user, + request.search_space_id, + Permission.DOCUMENTS_DELETE.value, + "You don't have permission to delete documents in this search space", + ) + + deleted_count = 0 + + for rel_path in request.relative_paths: + unique_id = f"{request.folder_name}:{rel_path}" + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, + unique_id, + request.search_space_id, + ) + + existing = ( + await session.execute( + select(Document).where( + Document.unique_identifier_hash == uid_hash + ) + ) + ).scalar_one_or_none() + + if existing: + deleted_folder_id = existing.folder_id + await session.delete(existing) + await session.flush() + + if deleted_folder_id and request.root_folder_id: + await _cleanup_empty_folder_chain( + session, deleted_folder_id, request.root_folder_id + ) + deleted_count += 1 + + await session.commit() + return {"deleted_count": deleted_count} + + +@router.post("/documents/folder-sync-finalize") +async def folder_sync_finalize( + request: FolderSyncFinalizeRequest, + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """Finalize a full folder scan by deleting orphaned documents. + + The client sends the complete list of relative paths currently in the + folder. Any document in the DB for this folder that is NOT in the list + gets deleted. + """ + from app.indexing_pipeline.document_hashing import compute_identifier_hash + from app.tasks.connector_indexers.local_folder_indexer import ( + _cleanup_empty_folders, + ) + + await check_permission( + session, + user, + request.search_space_id, + Permission.DOCUMENTS_DELETE.value, + "You don't have permission to delete documents in this search space", + ) + + seen_hashes: set[str] = set() + for rel_path in request.all_relative_paths: + unique_id = f"{request.folder_name}:{rel_path}" + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, + unique_id, + request.search_space_id, + ) + seen_hashes.add(uid_hash) + + all_root_folder_ids: set[int] = set() + if request.root_folder_id: + all_root_folder_ids.add(request.root_folder_id) + + all_db_folders = ( + ( + await session.execute( + select(Folder.id).where( + Folder.search_space_id == request.search_space_id, + ) + ) + ) + .scalars() + .all() + ) + all_root_folder_ids.update(all_db_folders) + + all_folder_docs = ( + ( + await session.execute( + select(Document).where( + Document.document_type == DocumentType.LOCAL_FOLDER_FILE, + Document.search_space_id == request.search_space_id, + Document.folder_id.in_(list(all_root_folder_ids)) + if all_root_folder_ids + else True, + ) + ) + ) + .scalars() + .all() + ) + + deleted_count = 0 + for doc in all_folder_docs: + if doc.unique_identifier_hash not in seen_hashes: + await session.delete(doc) + deleted_count += 1 + + await session.flush() + + if request.root_folder_id: + existing_dirs: set[str] = set() + for rel_path in request.all_relative_paths: + parent = str(os.path.dirname(rel_path)) + if parent and parent != ".": + existing_dirs.add(parent) + + folder_mapping: dict[str, int] = {} + if request.root_folder_id: + folder_mapping[""] = request.root_folder_id + + await _cleanup_empty_folders( + session, + request.root_folder_id, + request.search_space_id, + existing_dirs, + folder_mapping, + ) + + await session.commit() + return {"deleted_count": deleted_count} diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 4e9249d34..62720826f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -11,7 +11,10 @@ from app.config import config from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService from app.tasks.celery_tasks import get_celery_session_maker -from app.tasks.connector_indexers.local_folder_indexer import index_local_folder +from app.tasks.connector_indexers.local_folder_indexer import ( + index_local_folder, + index_uploaded_files, +) from app.tasks.document_processors import ( add_extension_received_document, add_youtube_video_document, @@ -1411,3 +1414,132 @@ async def _index_local_folder_async( heartbeat_task.cancel() if notification_id is not None: _stop_heartbeat(notification_id) + + +# ===== Upload-based folder indexing task ===== + + +@celery_app.task(name="index_uploaded_folder_files", bind=True) +def index_uploaded_folder_files_task( + self, + search_space_id: int, + user_id: str, + folder_name: str, + root_folder_id: int, + enable_summary: bool, + file_mappings: list[dict], +): + """Celery task to index files uploaded from the desktop app.""" + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete( + _index_uploaded_folder_files_async( + search_space_id=search_space_id, + user_id=user_id, + folder_name=folder_name, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + file_mappings=file_mappings, + ) + ) + finally: + loop.close() + + +async def _index_uploaded_folder_files_async( + search_space_id: int, + user_id: str, + folder_name: str, + root_folder_id: int, + enable_summary: bool, + file_mappings: list[dict], +): + """Run upload-based folder indexing with notification + heartbeat.""" + file_count = len(file_mappings) + doc_name = f"{folder_name} ({file_count} file{'s' if file_count != 1 else ''})" + + notification = None + notification_id: int | None = None + heartbeat_task = None + + async with get_celery_session_maker()() as session: + try: + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="LOCAL_FOLDER_FILE", + document_name=doc_name, + search_space_id=search_space_id, + ) + ) + notification_id = notification.id + _start_heartbeat(notification_id) + heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification_id)) + except Exception: + logger.warning( + "Failed to create notification for uploaded folder indexing", + exc_info=True, + ) + + async def _heartbeat_progress(completed_count: int) -> None: + if notification: + with contextlib.suppress(Exception): + await NotificationService.document_processing.notify_processing_progress( + session=session, + notification=notification, + stage="indexing", + stage_message=f"Syncing files ({completed_count}/{file_count})", + ) + + try: + _indexed, _failed, err = await index_uploaded_files( + session=session, + search_space_id=search_space_id, + user_id=user_id, + folder_name=folder_name, + root_folder_id=root_folder_id, + enable_summary=enable_summary, + file_mappings=file_mappings, + on_heartbeat_callback=_heartbeat_progress, + ) + + if notification: + try: + await session.refresh(notification) + if err: + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=err, + ) + else: + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + ) + except Exception: + logger.warning( + "Failed to update notification after uploaded folder indexing", + exc_info=True, + ) + + except Exception as e: + logger.exception(f"Uploaded folder indexing failed: {e}") + if notification: + try: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:200], + ) + except Exception: + pass + raise + finally: + if heartbeat_task: + heartbeat_task.cancel() + if notification_id is not None: + _stop_heartbeat(notification_id) diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 7f42f4638..7b433cf62 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -1081,3 +1081,294 @@ async def _index_single_file( logger.exception(f"Error indexing single file {target_file_path}: {e}") await session.rollback() return 0, 0, str(e) + + +# ======================================================================== +# Upload-based folder indexing (works for all deployment modes) +# ======================================================================== + + +async def _mirror_folder_structure_from_paths( + session: AsyncSession, + relative_paths: list[str], + folder_name: str, + search_space_id: int, + user_id: str, + root_folder_id: int | None = None, +) -> tuple[dict[str, int], int]: + """Create DB Folder rows from a list of relative file paths. + + Unlike ``_mirror_folder_structure`` this does not walk the filesystem; + it derives the directory tree from the paths provided by the client. + + Returns (mapping, root_folder_id) where mapping is + relative_dir_path -> folder_id. The empty-string key maps to root. + """ + dir_set: set[str] = set() + for rp in relative_paths: + parent = str(Path(rp).parent) + if parent == ".": + continue + parts = Path(parent).parts + for i in range(len(parts)): + dir_set.add(str(Path(*parts[: i + 1]))) + + subdirs = sorted(dir_set, key=lambda p: p.count(os.sep)) + + mapping: dict[str, int] = {} + + if root_folder_id: + existing = ( + await session.execute(select(Folder).where(Folder.id == root_folder_id)) + ).scalar_one_or_none() + if existing: + mapping[""] = existing.id + else: + root_folder_id = None + + if not root_folder_id: + root_folder = Folder( + name=folder_name, + search_space_id=search_space_id, + created_by_id=user_id, + position="a0", + ) + session.add(root_folder) + await session.flush() + mapping[""] = root_folder.id + root_folder_id = root_folder.id + + for rel_dir in subdirs: + dir_parts = Path(rel_dir).parts + dir_name = dir_parts[-1] + parent_rel = str(Path(*dir_parts[:-1])) if len(dir_parts) > 1 else "" + + parent_id = mapping.get(parent_rel, mapping[""]) + + existing_folder = ( + await session.execute( + select(Folder).where( + Folder.name == dir_name, + Folder.parent_id == parent_id, + Folder.search_space_id == search_space_id, + ) + ) + ).scalar_one_or_none() + + if existing_folder: + mapping[rel_dir] = existing_folder.id + else: + new_folder = Folder( + name=dir_name, + parent_id=parent_id, + search_space_id=search_space_id, + created_by_id=user_id, + position="a0", + ) + session.add(new_folder) + await session.flush() + mapping[rel_dir] = new_folder.id + + await session.flush() + return mapping, root_folder_id + + +UPLOAD_BATCH_CONCURRENCY = 5 + + +async def index_uploaded_files( + session: AsyncSession, + search_space_id: int, + user_id: str, + folder_name: str, + root_folder_id: int, + enable_summary: bool, + file_mappings: list[dict], + on_heartbeat_callback: HeartbeatCallbackType | None = None, +) -> tuple[int, int, str | None]: + """Index files uploaded from the desktop app via temp paths. + + Each entry in *file_mappings* is ``{temp_path, relative_path, filename}``. + This function mirrors the folder structure from the provided relative + paths, then indexes each file exactly like ``_index_single_file`` but + reads from the temp path. Temp files are cleaned up after processing. + + Returns ``(indexed_count, failed_count, error_summary_or_none)``. + """ + task_logger = TaskLoggingService(session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="local_folder_indexing", + source="uploaded_folder_indexing", + message=f"Indexing {len(file_mappings)} uploaded file(s) for {folder_name}", + metadata={"file_count": len(file_mappings)}, + ) + + try: + all_relative_paths = [m["relative_path"] for m in file_mappings] + folder_mapping, root_folder_id = await _mirror_folder_structure_from_paths( + session=session, + relative_paths=all_relative_paths, + folder_name=folder_name, + search_space_id=search_space_id, + user_id=user_id, + root_folder_id=root_folder_id, + ) + await session.flush() + + page_limit_service = PageLimitService(session) + pipeline = IndexingPipelineService(session) + llm = await get_user_long_context_llm(session, user_id, search_space_id) + + indexed_count = 0 + failed_count = 0 + errors: list[str] = [] + + for i, mapping in enumerate(file_mappings): + temp_path = mapping["temp_path"] + relative_path = mapping["relative_path"] + filename = mapping["filename"] + + try: + unique_id = f"{folder_name}:{relative_path}" + uid_hash = compute_identifier_hash( + DocumentType.LOCAL_FOLDER_FILE.value, + unique_id, + search_space_id, + ) + + try: + estimated_pages = await _check_page_limit_or_skip( + page_limit_service, user_id, temp_path + ) + except PageLimitExceededError: + logger.warning(f"Page limit exceeded, skipping: {relative_path}") + failed_count += 1 + continue + + try: + content, content_hash = await _compute_file_content_hash( + temp_path, filename, search_space_id + ) + except Exception as e: + logger.warning(f"Could not read {relative_path}: {e}") + failed_count += 1 + errors.append(f"{filename}: {e}") + continue + + if not content.strip(): + failed_count += 1 + continue + + existing = await check_document_by_unique_identifier( + session, uid_hash + ) + + if existing: + if existing.content_hash == content_hash: + meta = dict(existing.document_metadata or {}) + meta["mtime"] = datetime.now(UTC).timestamp() + existing.document_metadata = meta + if not DocumentStatus.is_state( + existing.status, DocumentStatus.READY + ): + existing.status = DocumentStatus.ready() + await session.commit() + continue + + await create_version_snapshot(session, existing) + + connector_doc = _build_connector_doc( + title=filename, + content=content, + relative_path=relative_path, + folder_name=folder_name, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + ) + + documents = await pipeline.prepare_for_indexing([connector_doc]) + if not documents: + failed_count += 1 + continue + + db_doc = documents[0] + + try: + db_doc.folder_id = await _resolve_folder_for_file( + session, + relative_path, + root_folder_id, + search_space_id, + user_id, + ) + await session.commit() + except IntegrityError: + await session.rollback() + await session.refresh(db_doc) + + await pipeline.index(db_doc, connector_doc, llm) + + await session.refresh(db_doc) + doc_meta = dict(db_doc.document_metadata or {}) + doc_meta["mtime"] = datetime.now(UTC).timestamp() + db_doc.document_metadata = doc_meta + await session.commit() + + if DocumentStatus.is_state(db_doc.status, DocumentStatus.READY): + indexed_count += 1 + final_pages = _compute_final_pages( + page_limit_service, estimated_pages, len(content) + ) + await page_limit_service.update_page_usage( + user_id, final_pages, allow_exceed=True + ) + else: + failed_count += 1 + + if on_heartbeat_callback and (i + 1) % 5 == 0: + await on_heartbeat_callback(i + 1) + + except Exception as e: + logger.exception( + f"Error indexing uploaded file {relative_path}: {e}" + ) + await session.rollback() + failed_count += 1 + errors.append(f"{filename}: {e}") + finally: + try: + os.unlink(temp_path) + except OSError: + pass + + error_summary = None + if errors: + error_summary = ( + f"{failed_count} file(s) failed: " + "; ".join(errors[:5]) + ) + if len(errors) > 5: + error_summary += f" ... and {len(errors) - 5} more" + + await task_logger.log_task_success( + log_entry, + f"Upload indexing complete: {indexed_count} indexed, {failed_count} failed", + {"indexed": indexed_count, "failed": failed_count}, + ) + + return indexed_count, failed_count, error_summary + + except SQLAlchemyError as e: + logger.exception(f"Database error during uploaded file indexing: {e}") + await session.rollback() + await task_logger.log_task_failure( + log_entry, f"DB error: {e}", "Database error", {} + ) + return 0, 0, f"Database error: {e}" + + except Exception as e: + logger.exception(f"Error during uploaded file indexing: {e}") + await task_logger.log_task_failure( + log_entry, f"Error: {e}", "Unexpected error", {} + ) + return 0, 0, str(e) diff --git a/surfsense_desktop/src/ipc/channels.ts b/surfsense_desktop/src/ipc/channels.ts index 39e75f046..1921dcda2 100644 --- a/surfsense_desktop/src/ipc/channels.ts +++ b/surfsense_desktop/src/ipc/channels.ts @@ -30,6 +30,7 @@ export const IPC_CHANNELS = { FOLDER_SYNC_RENDERER_READY: 'folder-sync:renderer-ready', FOLDER_SYNC_GET_PENDING_EVENTS: 'folder-sync:get-pending-events', FOLDER_SYNC_ACK_EVENTS: 'folder-sync:ack-events', + FOLDER_SYNC_LIST_FILES: 'folder-sync:list-files', BROWSE_FILES: 'browse:files', READ_LOCAL_FILES: 'browse:read-local-files', // Auth token sync across windows diff --git a/surfsense_desktop/src/ipc/handlers.ts b/surfsense_desktop/src/ipc/handlers.ts index 200fa75bd..a1d5552c9 100644 --- a/surfsense_desktop/src/ipc/handlers.ts +++ b/surfsense_desktop/src/ipc/handlers.ts @@ -19,6 +19,8 @@ import { markRendererReady, browseFiles, readLocalFiles, + listFolderFiles, + type WatchedFolderConfig, } from '../modules/folder-watcher'; import { getShortcuts, setShortcuts, type ShortcutConfig } from '../modules/shortcuts'; import { getActiveSearchSpaceId, setActiveSearchSpaceId } from '../modules/active-search-space'; @@ -91,6 +93,10 @@ export function registerIpcHandlers(): void { acknowledgeFileEvents(eventIds) ); + ipcMain.handle(IPC_CHANNELS.FOLDER_SYNC_LIST_FILES, (_event, config: WatchedFolderConfig) => + listFolderFiles(config) + ); + ipcMain.handle(IPC_CHANNELS.BROWSE_FILES, () => browseFiles()); ipcMain.handle(IPC_CHANNELS.READ_LOCAL_FILES, (_event, paths: string[]) => diff --git a/surfsense_desktop/src/modules/folder-watcher.ts b/surfsense_desktop/src/modules/folder-watcher.ts index 969dabe97..a39d7855a 100644 --- a/surfsense_desktop/src/modules/folder-watcher.ts +++ b/surfsense_desktop/src/modules/folder-watcher.ts @@ -188,6 +188,31 @@ function walkFolderMtimes(config: WatchedFolderConfig): MtimeMap { return result; } +export interface FolderFileEntry { + relativePath: string; + fullPath: string; + size: number; + mtimeMs: number; +} + +export function listFolderFiles(config: WatchedFolderConfig): FolderFileEntry[] { + const root = config.path; + const mtimeMap = walkFolderMtimes(config); + const entries: FolderFileEntry[] = []; + + for (const [relativePath, mtimeMs] of Object.entries(mtimeMap)) { + const fullPath = path.join(root, relativePath); + try { + const stat = fs.statSync(fullPath); + entries.push({ relativePath, fullPath, size: stat.size, mtimeMs }); + } catch { + // File may have been removed between walk and stat + } + } + + return entries; +} + function getMainWindow(): BrowserWindow | null { const windows = BrowserWindow.getAllWindows(); return windows.length > 0 ? windows[0] : null; diff --git a/surfsense_desktop/src/preload.ts b/surfsense_desktop/src/preload.ts index 4d9537c91..7cc63aea1 100644 --- a/surfsense_desktop/src/preload.ts +++ b/surfsense_desktop/src/preload.ts @@ -64,6 +64,7 @@ contextBridge.exposeInMainWorld('electronAPI', { signalRendererReady: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_RENDERER_READY), getPendingFileEvents: () => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_GET_PENDING_EVENTS), acknowledgeFileEvents: (eventIds: string[]) => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_ACK_EVENTS, eventIds), + listFolderFiles: (config: any) => ipcRenderer.invoke(IPC_CHANNELS.FOLDER_SYNC_LIST_FILES, config), // Browse files via native dialog browseFiles: () => ipcRenderer.invoke(IPC_CHANNELS.BROWSE_FILES), diff --git a/surfsense_web/components/layout/ui/sidebar/DocumentsSidebar.tsx b/surfsense_web/components/layout/ui/sidebar/DocumentsSidebar.tsx index c806c61d8..85c3a9897 100644 --- a/surfsense_web/components/layout/ui/sidebar/DocumentsSidebar.tsx +++ b/surfsense_web/components/layout/ui/sidebar/DocumentsSidebar.tsx @@ -23,7 +23,9 @@ import { FolderPickerDialog } from "@/components/documents/FolderPickerDialog"; import { FolderTreeView } from "@/components/documents/FolderTreeView"; import { VersionHistoryDialog } from "@/components/documents/version-history"; import { EXPORT_FILE_EXTENSIONS } from "@/components/shared/ExportMenuItems"; -import { FolderWatchDialog, type SelectedFolder } from "@/components/sources/FolderWatchDialog"; +import { DEFAULT_EXCLUDE_PATTERNS, FolderWatchDialog, type SelectedFolder } from "@/components/sources/FolderWatchDialog"; +import { uploadFolderScan } from "@/lib/folder-sync-upload"; +import { getSupportedExtensionsSet } from "@/lib/supported-extensions"; import { AlertDialog, AlertDialogAction, @@ -304,14 +306,17 @@ export function DocumentsSidebar({ } try { - await documentsApiService.folderIndex(searchSpaceId, { - folder_path: matched.path, - folder_name: matched.name, - search_space_id: searchSpaceId, - root_folder_id: folder.id, - file_extensions: matched.fileExtensions ?? undefined, + toast.info(`Re-scanning folder: ${matched.name}`); + await uploadFolderScan({ + folderPath: matched.path, + folderName: matched.name, + searchSpaceId, + excludePatterns: matched.excludePatterns ?? DEFAULT_EXCLUDE_PATTERNS, + fileExtensions: matched.fileExtensions ?? Array.from(getSupportedExtensionsSet()), + enableSummary: false, + rootFolderId: folder.id, }); - toast.success(`Re-scanning folder: ${matched.name}`); + toast.success(`Re-scan complete: ${matched.name}`); } catch (err) { toast.error((err as Error)?.message || "Failed to re-scan folder"); } diff --git a/surfsense_web/components/sources/FolderWatchDialog.tsx b/surfsense_web/components/sources/FolderWatchDialog.tsx index 1c66ea6b9..8ccf12afe 100644 --- a/surfsense_web/components/sources/FolderWatchDialog.tsx +++ b/surfsense_web/components/sources/FolderWatchDialog.tsx @@ -1,7 +1,7 @@ "use client"; import { X } from "lucide-react"; -import { useCallback, useEffect, useMemo, useState } from "react"; +import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { toast } from "sonner"; import { Button } from "@/components/ui/button"; import { @@ -13,8 +13,8 @@ import { } from "@/components/ui/dialog"; import { Spinner } from "@/components/ui/spinner"; import { Switch } from "@/components/ui/switch"; -import { documentsApiService } from "@/lib/apis/documents-api.service"; import { getSupportedExtensionsSet } from "@/lib/supported-extensions"; +import { type FolderSyncProgress, uploadFolderScan } from "@/lib/folder-sync-upload"; export interface SelectedFolder { path: string; @@ -29,7 +29,7 @@ interface FolderWatchDialogProps { initialFolder?: SelectedFolder | null; } -const DEFAULT_EXCLUDE_PATTERNS = [ +export const DEFAULT_EXCLUDE_PATTERNS = [ ".git", "node_modules", "__pycache__", @@ -48,6 +48,8 @@ export function FolderWatchDialog({ const [selectedFolder, setSelectedFolder] = useState(null); const [shouldSummarize, setShouldSummarize] = useState(false); const [submitting, setSubmitting] = useState(false); + const [progress, setProgress] = useState(null); + const abortRef = useRef(null); useEffect(() => { if (open && initialFolder) { @@ -68,29 +70,38 @@ export function FolderWatchDialog({ setSelectedFolder({ path: folderPath, name: folderName }); }, []); + const handleCancel = useCallback(() => { + abortRef.current?.abort(); + }, []); + const handleSubmit = useCallback(async () => { if (!selectedFolder) return; const api = window.electronAPI; if (!api) return; + const controller = new AbortController(); + abortRef.current = controller; setSubmitting(true); - try { - const result = await documentsApiService.folderIndex(searchSpaceId, { - folder_path: selectedFolder.path, - folder_name: selectedFolder.name, - search_space_id: searchSpaceId, - enable_summary: shouldSummarize, - file_extensions: supportedExtensions, - }); + setProgress(null); - const rootFolderId = (result as { root_folder_id?: number })?.root_folder_id ?? null; + try { + const rootFolderId = await uploadFolderScan({ + folderPath: selectedFolder.path, + folderName: selectedFolder.name, + searchSpaceId, + excludePatterns: DEFAULT_EXCLUDE_PATTERNS, + fileExtensions: supportedExtensions, + enableSummary: shouldSummarize, + onProgress: setProgress, + signal: controller.signal, + }); await api.addWatchedFolder({ path: selectedFolder.path, name: selectedFolder.name, excludePatterns: DEFAULT_EXCLUDE_PATTERNS, fileExtensions: supportedExtensions, - rootFolderId, + rootFolderId: rootFolderId ?? null, searchSpaceId, active: true, }); @@ -98,12 +109,19 @@ export function FolderWatchDialog({ toast.success(`Watching folder: ${selectedFolder.name}`); setSelectedFolder(null); setShouldSummarize(false); + setProgress(null); onOpenChange(false); onSuccess?.(); } catch (err) { - toast.error((err as Error)?.message || "Failed to watch folder"); + if ((err as Error)?.name === "AbortError") { + toast.info("Folder sync cancelled. Partial progress was saved."); + } else { + toast.error((err as Error)?.message || "Failed to watch folder"); + } } finally { + abortRef.current = null; setSubmitting(false); + setProgress(null); } }, [ selectedFolder, @@ -119,12 +137,31 @@ export function FolderWatchDialog({ if (!nextOpen && !submitting) { setSelectedFolder(null); setShouldSummarize(false); + setProgress(null); } onOpenChange(nextOpen); }, [onOpenChange, submitting] ); + const progressLabel = useMemo(() => { + if (!progress) return null; + switch (progress.phase) { + case "listing": + return "Scanning folder..."; + case "checking": + return `Checking ${progress.total} file(s)...`; + case "uploading": + return `Uploading ${progress.uploaded}/${progress.total} file(s)...`; + case "finalizing": + return "Finalizing..."; + case "done": + return "Done!"; + default: + return null; + } + }, [progress]); + return ( @@ -174,14 +211,39 @@ export function FolderWatchDialog({ - + + + ) : ( + )} - + )} diff --git a/surfsense_web/hooks/use-folder-sync.ts b/surfsense_web/hooks/use-folder-sync.ts index 847d0081b..7a85c31fe 100644 --- a/surfsense_web/hooks/use-folder-sync.ts +++ b/surfsense_web/hooks/use-folder-sync.ts @@ -20,12 +20,18 @@ const DEBOUNCE_MS = 2000; const MAX_WAIT_MS = 10_000; const MAX_BATCH_SIZE = 50; +interface FileEntry { + fullPath: string; + relativePath: string; + action: string; +} + interface BatchItem { folderPath: string; folderName: string; searchSpaceId: number; rootFolderId: number | null; - filePaths: string[]; + files: FileEntry[]; ackIds: string[]; } @@ -44,18 +50,40 @@ export function useFolderSync() { while (queueRef.current.length > 0) { const batch = queueRef.current.shift()!; try { - await documentsApiService.folderIndexFiles(batch.searchSpaceId, { - folder_path: batch.folderPath, - folder_name: batch.folderName, - search_space_id: batch.searchSpaceId, - target_file_paths: batch.filePaths, - root_folder_id: batch.rootFolderId, - }); + const addChangeFiles = batch.files.filter((f) => f.action === "add" || f.action === "change"); + const unlinkFiles = batch.files.filter((f) => f.action === "unlink"); + + if (addChangeFiles.length > 0 && electronAPI?.readLocalFiles) { + const fullPaths = addChangeFiles.map((f) => f.fullPath); + const fileDataArr = await electronAPI.readLocalFiles(fullPaths); + + const files: File[] = fileDataArr.map((fd) => { + const blob = new Blob([fd.data], { type: fd.mimeType || "application/octet-stream" }); + return new File([blob], fd.name, { type: blob.type }); + }); + + await documentsApiService.folderUploadFiles(files, { + folder_name: batch.folderName, + search_space_id: batch.searchSpaceId, + relative_paths: addChangeFiles.map((f) => f.relativePath), + root_folder_id: batch.rootFolderId, + }); + } + + if (unlinkFiles.length > 0) { + await documentsApiService.folderNotifyUnlinked({ + folder_name: batch.folderName, + search_space_id: batch.searchSpaceId, + root_folder_id: batch.rootFolderId, + relative_paths: unlinkFiles.map((f) => f.relativePath), + }); + } + if (electronAPI?.acknowledgeFileEvents && batch.ackIds.length > 0) { await electronAPI.acknowledgeFileEvents(batch.ackIds); } } catch (err) { - console.error("[FolderSync] Failed to trigger batch re-index:", err); + console.error("[FolderSync] Failed to process batch:", err); } } processingRef.current = false; @@ -68,10 +96,10 @@ export function useFolderSync() { if (!pending) return; pendingByFolder.current.delete(folderKey); - for (let i = 0; i < pending.filePaths.length; i += MAX_BATCH_SIZE) { + for (let i = 0; i < pending.files.length; i += MAX_BATCH_SIZE) { queueRef.current.push({ ...pending, - filePaths: pending.filePaths.slice(i, i + MAX_BATCH_SIZE), + files: pending.files.slice(i, i + MAX_BATCH_SIZE), ackIds: i === 0 ? pending.ackIds : [], }); } @@ -83,9 +111,14 @@ export function useFolderSync() { const existing = pendingByFolder.current.get(folderKey); if (existing) { - const pathSet = new Set(existing.filePaths); - pathSet.add(event.fullPath); - existing.filePaths = Array.from(pathSet); + const pathSet = new Set(existing.files.map((f) => f.fullPath)); + if (!pathSet.has(event.fullPath)) { + existing.files.push({ + fullPath: event.fullPath, + relativePath: event.relativePath, + action: event.action, + }); + } if (!existing.ackIds.includes(event.id)) { existing.ackIds.push(event.id); } @@ -95,7 +128,11 @@ export function useFolderSync() { folderName: event.folderName, searchSpaceId: event.searchSpaceId, rootFolderId: event.rootFolderId, - filePaths: [event.fullPath], + files: [{ + fullPath: event.fullPath, + relativePath: event.relativePath, + action: event.action, + }], ackIds: [event.id], }); firstEventTime.current.set(folderKey, Date.now()); diff --git a/surfsense_web/lib/apis/documents-api.service.ts b/surfsense_web/lib/apis/documents-api.service.ts index 5961522ec..3018cbc34 100644 --- a/surfsense_web/lib/apis/documents-api.service.ts +++ b/surfsense_web/lib/apis/documents-api.service.ts @@ -453,6 +453,76 @@ class DocumentsApiService { return baseApiService.post(`/api/v1/documents/folder-index-files`, undefined, { body }); }; + folderMtimeCheck = async (body: { + folder_name: string; + search_space_id: number; + files: { relative_path: string; mtime: number }[]; + }): Promise<{ files_to_upload: string[] }> => { + return baseApiService.post(`/api/v1/documents/folder-mtime-check`, undefined, { body }) as unknown as { files_to_upload: string[] }; + }; + + folderUploadFiles = async ( + files: File[], + metadata: { + folder_name: string; + search_space_id: number; + relative_paths: string[]; + root_folder_id?: number | null; + enable_summary?: boolean; + }, + signal?: AbortSignal, + ): Promise<{ message: string; status: string; root_folder_id: number; file_count: number }> => { + const formData = new FormData(); + for (const file of files) { + formData.append("files", file); + } + formData.append("folder_name", metadata.folder_name); + formData.append("search_space_id", String(metadata.search_space_id)); + formData.append("relative_paths", JSON.stringify(metadata.relative_paths)); + if (metadata.root_folder_id != null) { + formData.append("root_folder_id", String(metadata.root_folder_id)); + } + formData.append("enable_summary", String(metadata.enable_summary ?? false)); + + const totalSize = files.reduce((acc, f) => acc + f.size, 0); + const timeoutMs = Math.min(Math.max((totalSize / (1024 * 1024)) * 5000, 30_000), 600_000); + + const controller = new AbortController(); + const timeoutId = setTimeout(() => controller.abort(), timeoutMs); + + if (signal) { + signal.addEventListener("abort", () => controller.abort(), { once: true }); + } + + try { + return await baseApiService.postFormData( + `/api/v1/documents/folder-upload`, + undefined, + { body: formData, signal: controller.signal }, + ) as { message: string; status: string; root_folder_id: number; file_count: number }; + } finally { + clearTimeout(timeoutId); + } + }; + + folderNotifyUnlinked = async (body: { + folder_name: string; + search_space_id: number; + root_folder_id: number | null; + relative_paths: string[]; + }): Promise<{ deleted_count: number }> => { + return baseApiService.post(`/api/v1/documents/folder-unlink`, undefined, { body }) as unknown as { deleted_count: number }; + }; + + folderSyncFinalize = async (body: { + folder_name: string; + search_space_id: number; + root_folder_id: number | null; + all_relative_paths: string[]; + }): Promise<{ deleted_count: number }> => { + return baseApiService.post(`/api/v1/documents/folder-sync-finalize`, undefined, { body }) as unknown as { deleted_count: number }; + }; + getWatchedFolders = async (searchSpaceId: number) => { return baseApiService.get( `/api/v1/documents/watched-folders?search_space_id=${searchSpaceId}`, diff --git a/surfsense_web/lib/folder-sync-upload.ts b/surfsense_web/lib/folder-sync-upload.ts new file mode 100644 index 000000000..ef53c6b29 --- /dev/null +++ b/surfsense_web/lib/folder-sync-upload.ts @@ -0,0 +1,214 @@ +import { documentsApiService } from "@/lib/apis/documents-api.service"; + +const MAX_BATCH_SIZE_BYTES = 20 * 1024 * 1024; // 20 MB +const MAX_BATCH_FILES = 10; +const UPLOAD_CONCURRENCY = 3; + +export interface FolderSyncProgress { + phase: "listing" | "checking" | "uploading" | "finalizing" | "done"; + uploaded: number; + total: number; +} + +export interface FolderSyncParams { + folderPath: string; + folderName: string; + searchSpaceId: number; + excludePatterns: string[]; + fileExtensions: string[]; + enableSummary: boolean; + rootFolderId?: number | null; + onProgress?: (progress: FolderSyncProgress) => void; + signal?: AbortSignal; +} + +function buildBatches( + entries: FolderFileEntry[], +): FolderFileEntry[][] { + const batches: FolderFileEntry[][] = []; + let currentBatch: FolderFileEntry[] = []; + let currentSize = 0; + + for (const entry of entries) { + if (entry.size >= MAX_BATCH_SIZE_BYTES) { + if (currentBatch.length > 0) { + batches.push(currentBatch); + currentBatch = []; + currentSize = 0; + } + batches.push([entry]); + continue; + } + + if ( + currentBatch.length >= MAX_BATCH_FILES || + currentSize + entry.size > MAX_BATCH_SIZE_BYTES + ) { + batches.push(currentBatch); + currentBatch = []; + currentSize = 0; + } + + currentBatch.push(entry); + currentSize += entry.size; + } + + if (currentBatch.length > 0) { + batches.push(currentBatch); + } + + return batches; +} + +async function uploadBatchesWithConcurrency( + batches: FolderFileEntry[][], + params: { + folderName: string; + searchSpaceId: number; + rootFolderId: number | null; + enableSummary: boolean; + signal?: AbortSignal; + onBatchComplete?: (filesInBatch: number) => void; + }, +) { + const api = window.electronAPI; + if (!api) throw new Error("Electron API not available"); + + let batchIdx = 0; + const errors: string[] = []; + + async function processNext(): Promise { + while (true) { + if (params.signal?.aborted) return; + + const idx = batchIdx++; + if (idx >= batches.length) return; + + const batch = batches[idx]; + const fullPaths = batch.map((e) => e.fullPath); + + try { + const fileDataArr = await api.readLocalFiles(fullPaths); + + const files: File[] = fileDataArr.map((fd) => { + const blob = new Blob([fd.data], { type: fd.mimeType || "application/octet-stream" }); + return new File([blob], fd.name, { type: blob.type }); + }); + + await documentsApiService.folderUploadFiles( + files, + { + folder_name: params.folderName, + search_space_id: params.searchSpaceId, + relative_paths: batch.map((e) => e.relativePath), + root_folder_id: params.rootFolderId, + enable_summary: params.enableSummary, + }, + params.signal, + ); + + params.onBatchComplete?.(batch.length); + } catch (err) { + if (params.signal?.aborted) return; + const msg = (err as Error)?.message || "Upload failed"; + errors.push(`Batch ${idx}: ${msg}`); + } + } + } + + const workers = Array.from({ length: Math.min(UPLOAD_CONCURRENCY, batches.length) }, () => processNext()); + await Promise.all(workers); + + if (errors.length > 0 && !params.signal?.aborted) { + console.error("Some batches failed:", errors); + } +} + +/** + * Run a full upload-based folder scan: list files, mtime-check, upload + * changed files in parallel batches, and finalize (delete orphans). + * + * Returns the root_folder_id to pass to addWatchedFolder. + */ +export async function uploadFolderScan(params: FolderSyncParams): Promise { + const api = window.electronAPI; + if (!api) throw new Error("Electron API not available"); + + const { folderPath, folderName, searchSpaceId, excludePatterns, fileExtensions, enableSummary, signal } = params; + let rootFolderId = params.rootFolderId ?? null; + + params.onProgress?.({ phase: "listing", uploaded: 0, total: 0 }); + + if (signal?.aborted) throw new DOMException("Aborted", "AbortError"); + + const allFiles = await api.listFolderFiles({ + path: folderPath, + name: folderName, + excludePatterns, + fileExtensions, + rootFolderId: rootFolderId ?? null, + searchSpaceId, + active: true, + }); + + if (signal?.aborted) throw new DOMException("Aborted", "AbortError"); + + params.onProgress?.({ phase: "checking", uploaded: 0, total: allFiles.length }); + + const mtimeCheckResult = await documentsApiService.folderMtimeCheck({ + folder_name: folderName, + search_space_id: searchSpaceId, + files: allFiles.map((f) => ({ relative_path: f.relativePath, mtime: f.mtimeMs / 1000 })), + }); + + const filesToUpload = mtimeCheckResult.files_to_upload; + const uploadSet = new Set(filesToUpload); + const entriesToUpload = allFiles.filter((f) => uploadSet.has(f.relativePath)); + + if (signal?.aborted) throw new DOMException("Aborted", "AbortError"); + + if (entriesToUpload.length > 0) { + const batches = buildBatches(entriesToUpload); + + let uploaded = 0; + params.onProgress?.({ phase: "uploading", uploaded: 0, total: entriesToUpload.length }); + + await uploadBatchesWithConcurrency(batches, { + folderName, + searchSpaceId, + rootFolderId: rootFolderId ?? null, + enableSummary, + signal, + onBatchComplete: (count) => { + uploaded += count; + params.onProgress?.({ phase: "uploading", uploaded, total: entriesToUpload.length }); + }, + }); + + if (signal?.aborted) throw new DOMException("Aborted", "AbortError"); + + if (!rootFolderId) { + const watchedFolders = await documentsApiService.getWatchedFolders(searchSpaceId); + const folderList = watchedFolders as Array<{ id: number; name: string }> | undefined; + const matched = folderList?.find((f) => f.name === folderName); + if (matched?.id) { + rootFolderId = matched.id; + } + } + } + + if (signal?.aborted) throw new DOMException("Aborted", "AbortError"); + + params.onProgress?.({ phase: "finalizing", uploaded: entriesToUpload.length, total: entriesToUpload.length }); + + await documentsApiService.folderSyncFinalize({ + folder_name: folderName, + search_space_id: searchSpaceId, + root_folder_id: rootFolderId ?? null, + all_relative_paths: allFiles.map((f) => f.relativePath), + }); + + params.onProgress?.({ phase: "done", uploaded: entriesToUpload.length, total: entriesToUpload.length }); + + return rootFolderId; +} diff --git a/surfsense_web/types/window.d.ts b/surfsense_web/types/window.d.ts index eeef51b0b..4373cdaac 100644 --- a/surfsense_web/types/window.d.ts +++ b/surfsense_web/types/window.d.ts @@ -34,6 +34,13 @@ interface LocalFileData { size: number; } +interface FolderFileEntry { + relativePath: string; + fullPath: string; + size: number; + mtimeMs: number; +} + interface ElectronAPI { versions: { electron: string; @@ -82,6 +89,7 @@ interface ElectronAPI { signalRendererReady: () => Promise; getPendingFileEvents: () => Promise; acknowledgeFileEvents: (eventIds: string[]) => Promise<{ acknowledged: number }>; + listFolderFiles: (config: WatchedFolderConfig) => Promise; // Browse files/folders via native dialogs browseFiles: () => Promise; readLocalFiles: (paths: string[]) => Promise;