feat: implement cursor-based delta sync for Dropbox integration, enhancing file indexing efficiency and preserving folder cursors during re-authentication

This commit is contained in:
Anish Sarkar 2026-04-06 18:36:29 +05:30
parent be622c417c
commit b5a15b7681
5 changed files with 249 additions and 18 deletions

View file

@ -225,6 +225,55 @@ class DropboxClient:
return all_items, None
async def get_latest_cursor(
self, path: str = ""
) -> tuple[str | None, str | None]:
"""Get a cursor representing the current state of a folder.
Uses /2/files/list_folder/get_latest_cursor so we can later call
get_changes to receive only incremental updates.
"""
resp = await self._request(
"/2/files/list_folder/get_latest_cursor",
{"path": path, "recursive": False, "include_non_downloadable_files": True},
)
if resp.status_code != 200:
return None, f"Failed to get cursor: {resp.status_code} - {resp.text}"
return resp.json().get("cursor"), None
async def get_changes(
self, cursor: str
) -> tuple[list[dict[str, Any]], str | None, str | None]:
"""Fetch incremental changes since the given cursor.
Calls /2/files/list_folder/continue and handles pagination.
Returns (entries, new_cursor, error).
"""
all_entries: list[dict[str, Any]] = []
resp = await self._request(
"/2/files/list_folder/continue", {"cursor": cursor}
)
if resp.status_code == 401:
return [], None, "Dropbox authentication expired (401)"
if resp.status_code != 200:
return [], None, f"Failed to get changes: {resp.status_code} - {resp.text}"
data = resp.json()
all_entries.extend(data.get("entries", []))
while data.get("has_more"):
cursor = data["cursor"]
resp = await self._request(
"/2/files/list_folder/continue", {"cursor": cursor}
)
if resp.status_code != 200:
return all_entries, data.get("cursor"), f"Pagination failed: {resp.status_code}"
data = resp.json()
all_entries.extend(data.get("entries", []))
return all_entries, data.get("cursor"), None
async def get_metadata(self, path: str) -> tuple[dict[str, Any] | None, str | None]:
resp = await self._request("/2/files/get_metadata", {"path": path})
if resp.status_code != 200:

View file

@ -2,7 +2,24 @@
PAPER_EXTENSION = ".paper"
SKIP_EXTENSIONS: frozenset[str] = frozenset()
SKIP_EXTENSIONS: frozenset[str] = frozenset({
# Non-universal images (not supported by all 3 ETL pipelines)
".svg", ".gif", ".webp", ".heic", ".ico",
".raw", ".cr2", ".nef", ".arw", ".dng",
".psd", ".ai", ".sketch", ".fig",
# Video
".mov", ".avi", ".mkv", ".wmv", ".flv",
# Binaries / executables
".exe", ".dll", ".so", ".dylib", ".bin", ".app", ".dmg", ".iso",
# Archives
".zip", ".tar", ".gz", ".rar", ".7z", ".bz2",
# Fonts
".ttf", ".otf", ".woff", ".woff2",
# 3D / CAD
".stl", ".obj", ".fbx", ".blend",
# Database
".db", ".sqlite", ".mdb",
})
MIME_TO_EXTENSION: dict[str, str] = {
"application/pdf": ".pdf",

View file

@ -311,9 +311,11 @@ async def dropbox_callback(
)
existing_cursor = db_connector.config.get("cursor")
existing_folder_cursors = db_connector.config.get("folder_cursors")
db_connector.config = {
**connector_config,
"cursor": existing_cursor,
"folder_cursors": existing_folder_cursors,
"auth_expired": False,
}
flag_modified(db_connector, "config")

View file

@ -111,9 +111,10 @@ class DoclingService:
pipeline_options=pipeline_options, backend=PyPdfiumDocumentBackend
)
# Initialize DocumentConverter
# Initialize DocumentConverter with PDF and IMAGE support
self.converter = DocumentConverter(
format_options={InputFormat.PDF: pdf_format_option}
allowed_formats=[InputFormat.PDF, InputFormat.IMAGE],
format_options={InputFormat.PDF: pdf_format_option},
)
acceleration_type = "GPU (WSL2)" if self.use_gpu else "CPU"

View file

@ -250,6 +250,124 @@ async def _download_and_index(
return batch_indexed, download_failed + batch_failed
async def _remove_document(
session: AsyncSession, file_id: str, search_space_id: int
):
"""Remove a document that was deleted in Dropbox."""
primary_hash = compute_identifier_hash(
DocumentType.DROPBOX_FILE.value, file_id, search_space_id
)
existing = await check_document_by_unique_identifier(session, primary_hash)
if not existing:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.DROPBOX_FILE,
cast(Document.document_metadata["dropbox_file_id"], String)
== file_id,
)
)
existing = result.scalar_one_or_none()
if existing:
await session.delete(existing)
async def _index_with_delta_sync(
dropbox_client: DropboxClient,
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
cursor: str,
task_logger: TaskLoggingService,
log_entry: object,
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int, str]:
"""Delta sync using Dropbox cursor-based change tracking.
Returns (indexed_count, skipped_count, new_cursor).
"""
await task_logger.log_task_progress(
log_entry,
f"Starting delta sync from cursor: {cursor[:20]}...",
{"stage": "delta_sync", "cursor_prefix": cursor[:20]},
)
entries, new_cursor, error = await dropbox_client.get_changes(cursor)
if error:
err_lower = error.lower()
if "401" in error or "authentication expired" in err_lower:
raise Exception(
f"Dropbox authentication failed. Please re-authenticate. (Error: {error})"
)
raise Exception(f"Failed to fetch Dropbox changes: {error}")
if not entries:
logger.info("No changes detected since last sync")
return 0, 0, new_cursor or cursor
logger.info(f"Processing {len(entries)} change entries")
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
files_processed = 0
for entry in entries:
if files_processed >= max_files:
break
files_processed += 1
tag = entry.get(".tag")
if tag == "deleted":
path_lower = entry.get("path_lower", "")
name = entry.get("name", "")
file_id = entry.get("id", "")
if file_id:
await _remove_document(session, file_id, search_space_id)
logger.debug(f"Processed deletion: {name or path_lower}")
continue
if tag != "file":
continue
if skip_item(entry):
skipped += 1
continue
skip, msg = await _should_skip_file(session, entry, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
files_to_download.append(entry)
batch_indexed, failed = await _download_and_index(
dropbox_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(
f"Delta sync complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped, new_cursor or cursor
async def _index_full_scan(
dropbox_client: DropboxClient,
session: AsyncSession,
@ -437,6 +555,9 @@ async def index_dropbox_files(
max_files = indexing_options.get("max_files", 500)
incremental_sync = indexing_options.get("incremental_sync", True)
include_subfolders = indexing_options.get("include_subfolders", True)
use_delta_sync = indexing_options.get("use_delta_sync", True)
folder_cursors: dict = connector.config.get("folder_cursors", {})
total_indexed = 0
total_skipped = 0
@ -471,25 +592,66 @@ async def index_dropbox_files(
)
folder_name = folder.get("name", "Root")
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
folder_path,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
saved_cursor = folder_cursors.get(folder_path)
can_use_delta = (
use_delta_sync
and saved_cursor
and connector.last_indexed_at
)
if can_use_delta:
logger.info(f"Using delta sync for folder {folder_name}")
indexed, skipped, new_cursor = await _index_with_delta_sync(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
saved_cursor,
task_logger,
log_entry,
max_files,
enable_summary=connector_enable_summary,
)
folder_cursors[folder_path] = new_cursor
else:
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
folder_path,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
total_skipped += skipped
# Persist latest cursor for this folder
try:
latest_cursor, cursor_err = await dropbox_client.get_latest_cursor(
folder_path
)
if latest_cursor and not cursor_err:
folder_cursors[folder_path] = latest_cursor
except Exception as e:
logger.warning(f"Failed to get latest cursor for {folder_path}: {e}")
# Persist folder cursors to connector config
if folders:
cfg = dict(connector.config)
cfg["folder_cursors"] = folder_cursors
connector.config = cfg
flag_modified(connector, "config")
if total_indexed > 0 or folders:
await update_connector_last_indexed(session, connector, True)