From b5a15b7681b05ed1d17b7a34ae2c2769caaed4c9 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 6 Apr 2026 18:36:29 +0530 Subject: [PATCH] feat: implement cursor-based delta sync for Dropbox integration, enhancing file indexing efficiency and preserving folder cursors during re-authentication --- .../app/connectors/dropbox/client.py | 49 +++++ .../app/connectors/dropbox/file_types.py | 19 +- .../app/routes/dropbox_add_connector_route.py | 2 + .../app/services/docling_service.py | 5 +- .../connector_indexers/dropbox_indexer.py | 192 ++++++++++++++++-- 5 files changed, 249 insertions(+), 18 deletions(-) diff --git a/surfsense_backend/app/connectors/dropbox/client.py b/surfsense_backend/app/connectors/dropbox/client.py index dfae38f66..b177c2f8d 100644 --- a/surfsense_backend/app/connectors/dropbox/client.py +++ b/surfsense_backend/app/connectors/dropbox/client.py @@ -225,6 +225,55 @@ class DropboxClient: return all_items, None + async def get_latest_cursor( + self, path: str = "" + ) -> tuple[str | None, str | None]: + """Get a cursor representing the current state of a folder. + + Uses /2/files/list_folder/get_latest_cursor so we can later call + get_changes to receive only incremental updates. + """ + resp = await self._request( + "/2/files/list_folder/get_latest_cursor", + {"path": path, "recursive": False, "include_non_downloadable_files": True}, + ) + if resp.status_code != 200: + return None, f"Failed to get cursor: {resp.status_code} - {resp.text}" + return resp.json().get("cursor"), None + + async def get_changes( + self, cursor: str + ) -> tuple[list[dict[str, Any]], str | None, str | None]: + """Fetch incremental changes since the given cursor. + + Calls /2/files/list_folder/continue and handles pagination. + Returns (entries, new_cursor, error). + """ + all_entries: list[dict[str, Any]] = [] + + resp = await self._request( + "/2/files/list_folder/continue", {"cursor": cursor} + ) + if resp.status_code == 401: + return [], None, "Dropbox authentication expired (401)" + if resp.status_code != 200: + return [], None, f"Failed to get changes: {resp.status_code} - {resp.text}" + + data = resp.json() + all_entries.extend(data.get("entries", [])) + + while data.get("has_more"): + cursor = data["cursor"] + resp = await self._request( + "/2/files/list_folder/continue", {"cursor": cursor} + ) + if resp.status_code != 200: + return all_entries, data.get("cursor"), f"Pagination failed: {resp.status_code}" + data = resp.json() + all_entries.extend(data.get("entries", [])) + + return all_entries, data.get("cursor"), None + async def get_metadata(self, path: str) -> tuple[dict[str, Any] | None, str | None]: resp = await self._request("/2/files/get_metadata", {"path": path}) if resp.status_code != 200: diff --git a/surfsense_backend/app/connectors/dropbox/file_types.py b/surfsense_backend/app/connectors/dropbox/file_types.py index e6d772a1c..c245e039e 100644 --- a/surfsense_backend/app/connectors/dropbox/file_types.py +++ b/surfsense_backend/app/connectors/dropbox/file_types.py @@ -2,7 +2,24 @@ PAPER_EXTENSION = ".paper" -SKIP_EXTENSIONS: frozenset[str] = frozenset() +SKIP_EXTENSIONS: frozenset[str] = frozenset({ + # Non-universal images (not supported by all 3 ETL pipelines) + ".svg", ".gif", ".webp", ".heic", ".ico", + ".raw", ".cr2", ".nef", ".arw", ".dng", + ".psd", ".ai", ".sketch", ".fig", + # Video + ".mov", ".avi", ".mkv", ".wmv", ".flv", + # Binaries / executables + ".exe", ".dll", ".so", ".dylib", ".bin", ".app", ".dmg", ".iso", + # Archives + ".zip", ".tar", ".gz", ".rar", ".7z", ".bz2", + # Fonts + ".ttf", ".otf", ".woff", ".woff2", + # 3D / CAD + ".stl", ".obj", ".fbx", ".blend", + # Database + ".db", ".sqlite", ".mdb", +}) MIME_TO_EXTENSION: dict[str, str] = { "application/pdf": ".pdf", diff --git a/surfsense_backend/app/routes/dropbox_add_connector_route.py b/surfsense_backend/app/routes/dropbox_add_connector_route.py index 941e5c00f..1dba64467 100644 --- a/surfsense_backend/app/routes/dropbox_add_connector_route.py +++ b/surfsense_backend/app/routes/dropbox_add_connector_route.py @@ -311,9 +311,11 @@ async def dropbox_callback( ) existing_cursor = db_connector.config.get("cursor") + existing_folder_cursors = db_connector.config.get("folder_cursors") db_connector.config = { **connector_config, "cursor": existing_cursor, + "folder_cursors": existing_folder_cursors, "auth_expired": False, } flag_modified(db_connector, "config") diff --git a/surfsense_backend/app/services/docling_service.py b/surfsense_backend/app/services/docling_service.py index 82eaf7f74..360c197ed 100644 --- a/surfsense_backend/app/services/docling_service.py +++ b/surfsense_backend/app/services/docling_service.py @@ -111,9 +111,10 @@ class DoclingService: pipeline_options=pipeline_options, backend=PyPdfiumDocumentBackend ) - # Initialize DocumentConverter + # Initialize DocumentConverter with PDF and IMAGE support self.converter = DocumentConverter( - format_options={InputFormat.PDF: pdf_format_option} + allowed_formats=[InputFormat.PDF, InputFormat.IMAGE], + format_options={InputFormat.PDF: pdf_format_option}, ) acceleration_type = "GPU (WSL2)" if self.use_gpu else "CPU" diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 1b039add7..7a2f82a78 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -250,6 +250,124 @@ async def _download_and_index( return batch_indexed, download_failed + batch_failed +async def _remove_document( + session: AsyncSession, file_id: str, search_space_id: int +): + """Remove a document that was deleted in Dropbox.""" + primary_hash = compute_identifier_hash( + DocumentType.DROPBOX_FILE.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, primary_hash) + + if not existing: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.DROPBOX_FILE, + cast(Document.document_metadata["dropbox_file_id"], String) + == file_id, + ) + ) + existing = result.scalar_one_or_none() + + if existing: + await session.delete(existing) + + +async def _index_with_delta_sync( + dropbox_client: DropboxClient, + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + cursor: str, + task_logger: TaskLoggingService, + log_entry: object, + max_files: int, + on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, +) -> tuple[int, int, str]: + """Delta sync using Dropbox cursor-based change tracking. + + Returns (indexed_count, skipped_count, new_cursor). + """ + await task_logger.log_task_progress( + log_entry, + f"Starting delta sync from cursor: {cursor[:20]}...", + {"stage": "delta_sync", "cursor_prefix": cursor[:20]}, + ) + + entries, new_cursor, error = await dropbox_client.get_changes(cursor) + if error: + err_lower = error.lower() + if "401" in error or "authentication expired" in err_lower: + raise Exception( + f"Dropbox authentication failed. Please re-authenticate. (Error: {error})" + ) + raise Exception(f"Failed to fetch Dropbox changes: {error}") + + if not entries: + logger.info("No changes detected since last sync") + return 0, 0, new_cursor or cursor + + logger.info(f"Processing {len(entries)} change entries") + + renamed_count = 0 + skipped = 0 + files_to_download: list[dict] = [] + files_processed = 0 + + for entry in entries: + if files_processed >= max_files: + break + files_processed += 1 + + tag = entry.get(".tag") + + if tag == "deleted": + path_lower = entry.get("path_lower", "") + name = entry.get("name", "") + file_id = entry.get("id", "") + if file_id: + await _remove_document(session, file_id, search_space_id) + logger.debug(f"Processed deletion: {name or path_lower}") + continue + + if tag != "file": + continue + + if skip_item(entry): + skipped += 1 + continue + + skip, msg = await _should_skip_file(session, entry, search_space_id) + if skip: + if msg and "renamed" in msg.lower(): + renamed_count += 1 + else: + skipped += 1 + continue + + files_to_download.append(entry) + + batch_indexed, failed = await _download_and_index( + dropbox_client, + session, + files_to_download, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + on_heartbeat=on_heartbeat_callback, + ) + + indexed = renamed_count + batch_indexed + logger.info( + f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed" + ) + return indexed, skipped, new_cursor or cursor + + async def _index_full_scan( dropbox_client: DropboxClient, session: AsyncSession, @@ -437,6 +555,9 @@ async def index_dropbox_files( max_files = indexing_options.get("max_files", 500) incremental_sync = indexing_options.get("incremental_sync", True) include_subfolders = indexing_options.get("include_subfolders", True) + use_delta_sync = indexing_options.get("use_delta_sync", True) + + folder_cursors: dict = connector.config.get("folder_cursors", {}) total_indexed = 0 total_skipped = 0 @@ -471,25 +592,66 @@ async def index_dropbox_files( ) folder_name = folder.get("name", "Root") - logger.info(f"Using full scan for folder {folder_name}") - indexed, skipped = await _index_full_scan( - dropbox_client, - session, - connector_id, - search_space_id, - user_id, - folder_path, - folder_name, - task_logger, - log_entry, - max_files, - include_subfolders, - incremental_sync=incremental_sync, - enable_summary=connector_enable_summary, + saved_cursor = folder_cursors.get(folder_path) + can_use_delta = ( + use_delta_sync + and saved_cursor + and connector.last_indexed_at ) + + if can_use_delta: + logger.info(f"Using delta sync for folder {folder_name}") + indexed, skipped, new_cursor = await _index_with_delta_sync( + dropbox_client, + session, + connector_id, + search_space_id, + user_id, + saved_cursor, + task_logger, + log_entry, + max_files, + enable_summary=connector_enable_summary, + ) + folder_cursors[folder_path] = new_cursor + else: + logger.info(f"Using full scan for folder {folder_name}") + indexed, skipped = await _index_full_scan( + dropbox_client, + session, + connector_id, + search_space_id, + user_id, + folder_path, + folder_name, + task_logger, + log_entry, + max_files, + include_subfolders, + incremental_sync=incremental_sync, + enable_summary=connector_enable_summary, + ) + total_indexed += indexed total_skipped += skipped + # Persist latest cursor for this folder + try: + latest_cursor, cursor_err = await dropbox_client.get_latest_cursor( + folder_path + ) + if latest_cursor and not cursor_err: + folder_cursors[folder_path] = latest_cursor + except Exception as e: + logger.warning(f"Failed to get latest cursor for {folder_path}: {e}") + + # Persist folder cursors to connector config + if folders: + cfg = dict(connector.config) + cfg["folder_cursors"] = folder_cursors + connector.config = cfg + flag_modified(connector, "config") + if total_indexed > 0 or folders: await update_connector_last_indexed(session, connector, True)