diff --git a/surfsense_backend/app/connectors/dropbox/client.py b/surfsense_backend/app/connectors/dropbox/client.py new file mode 100644 index 000000000..27bffcb91 --- /dev/null +++ b/surfsense_backend/app/connectors/dropbox/client.py @@ -0,0 +1,317 @@ +"""Dropbox API client using Dropbox HTTP API v2.""" + +import json +import logging +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified + +from app.config import config +from app.db import SearchSourceConnector +from app.utils.oauth_security import TokenEncryption + +logger = logging.getLogger(__name__) + +API_BASE = "https://api.dropboxapi.com" +CONTENT_BASE = "https://content.dropboxapi.com" +TOKEN_URL = "https://api.dropboxapi.com/oauth2/token" + + +class DropboxClient: + """Client for Dropbox via the HTTP API v2.""" + + def __init__(self, session: AsyncSession, connector_id: int): + self._session = session + self._connector_id = connector_id + + async def _get_valid_token(self) -> str: + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + if not connector: + raise ValueError(f"Connector {self._connector_id} not found") + + cfg = connector.config or {} + is_encrypted = cfg.get("_token_encrypted", False) + token_encryption = ( + TokenEncryption(config.SECRET_KEY) if config.SECRET_KEY else None + ) + + access_token = cfg.get("access_token", "") + refresh_token = cfg.get("refresh_token") + + if is_encrypted and token_encryption: + if access_token: + access_token = token_encryption.decrypt_token(access_token) + if refresh_token: + refresh_token = token_encryption.decrypt_token(refresh_token) + + expires_at_str = cfg.get("expires_at") + is_expired = False + if expires_at_str: + expires_at = datetime.fromisoformat(expires_at_str) + if expires_at.tzinfo is None: + expires_at = expires_at.replace(tzinfo=UTC) + is_expired = expires_at <= datetime.now(UTC) + + if not is_expired and access_token: + return access_token + + if not refresh_token: + cfg["auth_expired"] = True + connector.config = cfg + flag_modified(connector, "config") + await self._session.commit() + raise ValueError("Dropbox token expired and no refresh token available") + + token_data = await self._refresh_token(refresh_token) + + new_access = token_data["access_token"] + expires_in = token_data.get("expires_in") + + new_expires_at = None + if expires_in: + new_expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in)) + + if token_encryption: + cfg["access_token"] = token_encryption.encrypt_token(new_access) + else: + cfg["access_token"] = new_access + + cfg["expires_at"] = new_expires_at.isoformat() if new_expires_at else None + cfg["expires_in"] = expires_in + cfg["_token_encrypted"] = bool(token_encryption) + cfg.pop("auth_expired", None) + + connector.config = cfg + flag_modified(connector, "config") + await self._session.commit() + + return new_access + + async def _refresh_token(self, refresh_token: str) -> dict: + data = { + "client_id": config.DROPBOX_APP_KEY, + "client_secret": config.DROPBOX_APP_SECRET, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + } + async with httpx.AsyncClient() as client: + resp = await client.post( + TOKEN_URL, + data=data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + if resp.status_code != 200: + error_detail = resp.text + try: + error_json = resp.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass + raise ValueError(f"Dropbox token refresh failed: {error_detail}") + return resp.json() + + async def _request( + self, path: str, json_body: dict | None = None, **kwargs + ) -> httpx.Response: + """Make an authenticated RPC request to the Dropbox API.""" + token = await self._get_valid_token() + headers = { + "Authorization": f"Bearer {token}", + "Content-Type": "application/json", + } + if "headers" in kwargs: + headers.update(kwargs.pop("headers")) + + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{API_BASE}{path}", + headers=headers, + json=json_body, + timeout=60.0, + **kwargs, + ) + + if resp.status_code == 401: + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + if connector: + cfg = connector.config or {} + cfg["auth_expired"] = True + connector.config = cfg + flag_modified(connector, "config") + await self._session.commit() + raise ValueError("Dropbox authentication expired (401)") + + return resp + + async def _content_request( + self, path: str, api_arg: dict, content: bytes | None = None, **kwargs + ) -> httpx.Response: + """Make an authenticated content-upload/download request.""" + token = await self._get_valid_token() + headers = { + "Authorization": f"Bearer {token}", + "Dropbox-API-Arg": json.dumps(api_arg), + "Content-Type": "application/octet-stream", + } + if "headers" in kwargs: + headers.update(kwargs.pop("headers")) + + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{CONTENT_BASE}{path}", + headers=headers, + content=content or b"", + timeout=120.0, + **kwargs, + ) + + if resp.status_code == 401: + result = await self._session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == self._connector_id + ) + ) + connector = result.scalars().first() + if connector: + cfg = connector.config or {} + cfg["auth_expired"] = True + connector.config = cfg + flag_modified(connector, "config") + await self._session.commit() + raise ValueError("Dropbox authentication expired (401)") + + return resp + + async def list_folder( + self, path: str = "" + ) -> tuple[list[dict[str, Any]], str | None]: + """List all items in a folder. Handles pagination via cursor.""" + all_items: list[dict[str, Any]] = [] + + resp = await self._request( + "/2/files/list_folder", + {"path": path, "recursive": False, "include_non_downloadable_files": True}, + ) + if resp.status_code != 200: + return [], f"Failed to list folder: {resp.status_code} - {resp.text}" + + data = resp.json() + all_items.extend(data.get("entries", [])) + + while data.get("has_more"): + cursor = data["cursor"] + resp = await self._request( + "/2/files/list_folder/continue", {"cursor": cursor} + ) + if resp.status_code != 200: + return all_items, f"Pagination failed: {resp.status_code}" + data = resp.json() + all_items.extend(data.get("entries", [])) + + return all_items, None + + async def get_metadata( + self, path: str + ) -> tuple[dict[str, Any] | None, str | None]: + resp = await self._request("/2/files/get_metadata", {"path": path}) + if resp.status_code != 200: + return None, f"Failed to get metadata: {resp.status_code} - {resp.text}" + return resp.json(), None + + async def download_file(self, path: str) -> tuple[bytes | None, str | None]: + resp = await self._content_request( + "/2/files/download", {"path": path} + ) + if resp.status_code != 200: + return None, f"Download failed: {resp.status_code}" + return resp.content, None + + async def download_file_to_disk(self, path: str, dest_path: str) -> str | None: + """Stream file content to disk. Returns error message on failure.""" + token = await self._get_valid_token() + headers = { + "Authorization": f"Bearer {token}", + "Dropbox-API-Arg": json.dumps({"path": path}), + } + async with ( + httpx.AsyncClient() as client, + client.stream( + "POST", + f"{CONTENT_BASE}/2/files/download", + headers=headers, + timeout=120.0, + ) as resp, + ): + if resp.status_code != 200: + return f"Download failed: {resp.status_code}" + with open(dest_path, "wb") as f: + async for chunk in resp.aiter_bytes(chunk_size=5 * 1024 * 1024): + f.write(chunk) + return None + + async def upload_file( + self, + path: str, + content: bytes, + mode: str = "add", + autorename: bool = True, + ) -> dict[str, Any]: + """Upload a file to Dropbox (up to 150MB).""" + api_arg = {"path": path, "mode": mode, "autorename": autorename} + resp = await self._content_request("/2/files/upload", api_arg, content) + if resp.status_code != 200: + raise ValueError(f"Upload failed: {resp.status_code} - {resp.text}") + return resp.json() + + async def create_paper_doc( + self, path: str, markdown_content: str + ) -> dict[str, Any]: + """Create a Dropbox Paper document from markdown.""" + token = await self._get_valid_token() + api_arg = {"import_format": "markdown", "path": path} + headers = { + "Authorization": f"Bearer {token}", + "Dropbox-API-Arg": json.dumps(api_arg), + "Content-Type": "application/octet-stream", + } + async with httpx.AsyncClient() as client: + resp = await client.post( + f"{API_BASE}/2/files/paper/create", + headers=headers, + content=markdown_content.encode("utf-8"), + timeout=60.0, + ) + if resp.status_code != 200: + raise ValueError( + f"Paper doc creation failed: {resp.status_code} - {resp.text}" + ) + return resp.json() + + async def delete_file(self, path: str) -> dict[str, Any]: + """Delete a file or folder.""" + resp = await self._request("/2/files/delete_v2", {"path": path}) + if resp.status_code != 200: + raise ValueError(f"Delete failed: {resp.status_code} - {resp.text}") + return resp.json() + + async def get_current_account(self) -> tuple[dict[str, Any] | None, str | None]: + """Get current user's account info.""" + resp = await self._request("/2/users/get_current_account", None) + if resp.status_code != 200: + return None, f"Failed to get account: {resp.status_code}" + return resp.json(), None diff --git a/surfsense_backend/app/connectors/dropbox/file_types.py b/surfsense_backend/app/connectors/dropbox/file_types.py new file mode 100644 index 000000000..a3ddc2afb --- /dev/null +++ b/surfsense_backend/app/connectors/dropbox/file_types.py @@ -0,0 +1,48 @@ +"""File type handlers for Dropbox.""" + +SKIP_EXTENSIONS = frozenset( + { + ".paper", # Dropbox Paper docs are not downloadable via /files/download + } +) + +MIME_TO_EXTENSION: dict[str, str] = { + "application/pdf": ".pdf", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx", + "application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx", + "application/vnd.ms-excel": ".xls", + "application/msword": ".doc", + "application/vnd.ms-powerpoint": ".ppt", + "text/plain": ".txt", + "text/csv": ".csv", + "text/html": ".html", + "text/markdown": ".md", + "application/json": ".json", + "application/xml": ".xml", + "image/png": ".png", + "image/jpeg": ".jpg", +} + + +def get_extension_from_name(name: str) -> str: + """Extract extension from filename.""" + dot = name.rfind(".") + if dot > 0: + return name[dot:] + return "" + + +def is_folder(item: dict) -> bool: + return item.get(".tag") == "folder" + + +def should_skip_file(item: dict) -> bool: + """Skip folders and non-downloadable files.""" + if is_folder(item): + return True + if not item.get("is_downloadable", True): + return True + name = item.get("name", "") + ext = get_extension_from_name(name).lower() + return ext in SKIP_EXTENSIONS diff --git a/surfsense_backend/app/connectors/dropbox/folder_manager.py b/surfsense_backend/app/connectors/dropbox/folder_manager.py new file mode 100644 index 000000000..5453c8785 --- /dev/null +++ b/surfsense_backend/app/connectors/dropbox/folder_manager.py @@ -0,0 +1,92 @@ +"""Folder management for Dropbox.""" + +import logging +from typing import Any + +from .client import DropboxClient +from .file_types import is_folder, should_skip_file + +logger = logging.getLogger(__name__) + + +async def list_folder_contents( + client: DropboxClient, + path: str = "", +) -> tuple[list[dict[str, Any]], str | None]: + """List folders and files in a Dropbox folder. + + Returns (items list with folders first, error message). + """ + try: + items, error = await client.list_folder(path) + if error: + return [], error + + for item in items: + item["isFolder"] = is_folder(item) + + items.sort(key=lambda x: (not x["isFolder"], x.get("name", "").lower())) + + folder_count = sum(1 for item in items if item["isFolder"]) + file_count = len(items) - folder_count + logger.info( + f"Listed {len(items)} items ({folder_count} folders, {file_count} files) " + + (f"in folder {path}" if path else "in root") + ) + return items, None + + except Exception as e: + logger.error(f"Error listing folder contents: {e!s}", exc_info=True) + return [], f"Error listing folder contents: {e!s}" + + +async def get_files_in_folder( + client: DropboxClient, + path: str, + include_subfolders: bool = True, +) -> tuple[list[dict[str, Any]], str | None]: + """Get all indexable files in a folder, optionally recursing into subfolders.""" + try: + items, error = await client.list_folder(path) + if error: + return [], error + + files: list[dict[str, Any]] = [] + for item in items: + if is_folder(item): + if include_subfolders: + sub_files, sub_error = await get_files_in_folder( + client, item.get("path_lower", ""), include_subfolders=True + ) + if sub_error: + logger.warning( + f"Error recursing into folder {item.get('name')}: {sub_error}" + ) + continue + files.extend(sub_files) + elif not should_skip_file(item): + files.append(item) + + return files, None + + except Exception as e: + logger.error(f"Error getting files in folder: {e!s}", exc_info=True) + return [], f"Error getting files in folder: {e!s}" + + +async def get_file_by_path( + client: DropboxClient, + path: str, +) -> tuple[dict[str, Any] | None, str | None]: + """Get file metadata by path.""" + try: + item, error = await client.get_metadata(path) + if error: + return None, error + if not item: + return None, f"File not found: {path}" + return item, None + + except Exception as e: + logger.error(f"Error getting file by path: {e!s}", exc_info=True) + return None, f"Error getting file by path: {e!s}" diff --git a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py index 9eccbc798..57475c9fd 100644 --- a/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/connector_tasks.py @@ -574,6 +574,54 @@ async def _index_onedrive_files( ) +@celery_app.task(name="index_dropbox_files", bind=True) +def index_dropbox_files_task( + self, + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, +): + """Celery task to index Dropbox folders and files.""" + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete( + _index_dropbox_files( + connector_id, + search_space_id, + user_id, + items_dict, + ) + ) + finally: + loop.close() + + +async def _index_dropbox_files( + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, +): + """Index Dropbox folders and files with new session.""" + from app.routes.search_source_connectors_routes import ( + run_dropbox_indexing, + ) + + async with get_celery_session_maker()() as session: + await run_dropbox_indexing( + session, + connector_id, + search_space_id, + user_id, + items_dict, + ) + + @celery_app.task(name="index_discord_messages", bind=True) def index_discord_messages_task( self, diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py new file mode 100644 index 000000000..33282288d --- /dev/null +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -0,0 +1,503 @@ +"""Dropbox indexer using the shared IndexingPipelineService. + +File-level pre-filter (_should_skip_file) handles content_hash and +server_modified checks. download_and_extract_content() returns +markdown which is fed into ConnectorDocument -> pipeline. +""" + +import asyncio +import logging +import time +from collections.abc import Awaitable, Callable + +from sqlalchemy import String, cast, select +from sqlalchemy.exc import SQLAlchemyError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.orm.attributes import flag_modified + +from app.config import config +from app.connectors.dropbox import ( + DropboxClient, + download_and_extract_content, + get_file_by_path, + get_files_in_folder, +) +from app.connectors.dropbox.file_types import should_skip_file as skip_item +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService +from app.services.llm_service import get_user_long_context_llm +from app.services.task_logging_service import TaskLoggingService +from app.tasks.connector_indexers.base import ( + check_document_by_unique_identifier, + get_connector_by_id, + update_connector_last_indexed, +) + +HeartbeatCallbackType = Callable[[int], Awaitable[None]] +HEARTBEAT_INTERVAL_SECONDS = 30 + +logger = logging.getLogger(__name__) + + +async def _should_skip_file( + session: AsyncSession, + file: dict, + search_space_id: int, +) -> tuple[bool, str | None]: + """Pre-filter: detect unchanged / rename-only files.""" + file_id = file.get("id", "") + file_name = file.get("name", "Unknown") + + if skip_item(file): + return True, "folder/non-downloadable" + if not file_id: + return True, "missing file_id" + + primary_hash = compute_identifier_hash( + DocumentType.DROPBOX_FILE.value, file_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, primary_hash) + + if not existing: + result = await session.execute( + select(Document).where( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.DROPBOX_FILE, + cast(Document.document_metadata["dropbox_file_id"], String) == file_id, + ) + ) + existing = result.scalar_one_or_none() + if existing: + existing.unique_identifier_hash = primary_hash + logger.debug(f"Found Dropbox doc by metadata for file_id: {file_id}") + + if not existing: + return False, None + + incoming_content_hash = file.get("content_hash") + meta = existing.document_metadata or {} + stored_content_hash = meta.get("content_hash") + + incoming_mtime = file.get("server_modified") + stored_mtime = meta.get("modified_time") + + content_unchanged = False + if incoming_content_hash and stored_content_hash: + content_unchanged = incoming_content_hash == stored_content_hash + elif incoming_content_hash and not stored_content_hash: + return False, None + elif not incoming_content_hash and incoming_mtime and stored_mtime: + content_unchanged = incoming_mtime == stored_mtime + elif not incoming_content_hash: + return False, None + + if not content_unchanged: + return False, None + + old_name = meta.get("dropbox_file_name") + if old_name and old_name != file_name: + existing.title = file_name + if not existing.document_metadata: + existing.document_metadata = {} + existing.document_metadata["dropbox_file_name"] = file_name + if incoming_mtime: + existing.document_metadata["modified_time"] = incoming_mtime + flag_modified(existing, "document_metadata") + await session.commit() + logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'") + return True, f"File renamed: '{old_name}' -> '{file_name}'" + + if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + return True, "skipped (previously failed)" + return True, "unchanged" + + +def _build_connector_doc( + file: dict, + markdown: str, + dropbox_metadata: dict, + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, +) -> ConnectorDocument: + file_id = file.get("id", "") + file_name = file.get("name", "Unknown") + + metadata = { + **dropbox_metadata, + "connector_id": connector_id, + "document_type": "Dropbox File", + "connector_type": "Dropbox", + } + + fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}" + + return ConnectorDocument( + title=file_name, + source_markdown=markdown, + unique_id=file_id, + document_type=DocumentType.DROPBOX_FILE, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=enable_summary, + fallback_summary=fallback_summary, + metadata=metadata, + ) + + +async def _download_files_parallel( + dropbox_client: DropboxClient, + files: list[dict], + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, + max_concurrency: int = 3, + on_heartbeat: HeartbeatCallbackType | None = None, +) -> tuple[list[ConnectorDocument], int]: + """Download and ETL files in parallel. Returns (docs, failed_count).""" + results: list[ConnectorDocument] = [] + sem = asyncio.Semaphore(max_concurrency) + last_heartbeat = time.time() + completed_count = 0 + hb_lock = asyncio.Lock() + + async def _download_one(file: dict) -> ConnectorDocument | None: + nonlocal last_heartbeat, completed_count + async with sem: + markdown, db_metadata, error = await download_and_extract_content( + dropbox_client, file + ) + if error or not markdown: + file_name = file.get("name", "Unknown") + reason = error or "empty content" + logger.warning(f"Download/ETL failed for {file_name}: {reason}") + return None + doc = _build_connector_doc( + file, + markdown, + db_metadata, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + ) + async with hb_lock: + completed_count += 1 + if on_heartbeat: + now = time.time() + if now - last_heartbeat >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat(completed_count) + last_heartbeat = now + return doc + + tasks = [_download_one(f) for f in files] + outcomes = await asyncio.gather(*tasks, return_exceptions=True) + + failed = 0 + for outcome in outcomes: + if isinstance(outcome, Exception) or outcome is None: + failed += 1 + else: + results.append(outcome) + + return results, failed + + +async def _download_and_index( + dropbox_client: DropboxClient, + session: AsyncSession, + files: list[dict], + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, + on_heartbeat: HeartbeatCallbackType | None = None, +) -> tuple[int, int]: + """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" + connector_docs, download_failed = await _download_files_parallel( + dropbox_client, + files, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + on_heartbeat=on_heartbeat, + ) + + batch_indexed = 0 + batch_failed = 0 + if connector_docs: + pipeline = IndexingPipelineService(session) + + async def _get_llm(s): + return await get_user_long_context_llm(s, user_id, search_space_id) + + _, batch_indexed, batch_failed = await pipeline.index_batch_parallel( + connector_docs, + _get_llm, + max_concurrency=3, + on_heartbeat=on_heartbeat, + ) + + return batch_indexed, download_failed + batch_failed + + +async def _index_full_scan( + dropbox_client: DropboxClient, + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + folder_path: str, + folder_name: str, + task_logger: TaskLoggingService, + log_entry: object, + max_files: int, + include_subfolders: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, + enable_summary: bool = True, +) -> tuple[int, int]: + """Full scan indexing of a folder.""" + await task_logger.log_task_progress( + log_entry, + f"Starting full scan of folder: {folder_name}", + { + "stage": "full_scan", + "folder_path": folder_path, + "include_subfolders": include_subfolders, + }, + ) + + renamed_count = 0 + skipped = 0 + files_to_download: list[dict] = [] + + all_files, error = await get_files_in_folder( + dropbox_client, + folder_path, + include_subfolders=include_subfolders, + ) + if error: + err_lower = error.lower() + if "401" in error or "authentication expired" in err_lower: + raise Exception( + f"Dropbox authentication failed. Please re-authenticate. (Error: {error})" + ) + raise Exception(f"Failed to list Dropbox files: {error}") + + for file in all_files[:max_files]: + skip, msg = await _should_skip_file(session, file, search_space_id) + if skip: + if msg and "renamed" in msg.lower(): + renamed_count += 1 + else: + skipped += 1 + continue + files_to_download.append(file) + + batch_indexed, failed = await _download_and_index( + dropbox_client, + session, + files_to_download, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + on_heartbeat=on_heartbeat_callback, + ) + + indexed = renamed_count + batch_indexed + logger.info( + f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed" + ) + return indexed, skipped + + +async def _index_selected_files( + dropbox_client: DropboxClient, + session: AsyncSession, + file_paths: list[tuple[str, str | None]], + *, + connector_id: int, + search_space_id: int, + user_id: str, + enable_summary: bool, + on_heartbeat: HeartbeatCallbackType | None = None, +) -> tuple[int, int, list[str]]: + """Index user-selected files using the parallel pipeline.""" + files_to_download: list[dict] = [] + errors: list[str] = [] + renamed_count = 0 + skipped = 0 + + for file_path, file_name in file_paths: + file, error = await get_file_by_path(dropbox_client, file_path) + if error or not file: + display = file_name or file_path + errors.append(f"File '{display}': {error or 'File not found'}") + continue + + skip, msg = await _should_skip_file(session, file, search_space_id) + if skip: + if msg and "renamed" in msg.lower(): + renamed_count += 1 + else: + skipped += 1 + continue + + files_to_download.append(file) + + batch_indexed, _failed = await _download_and_index( + dropbox_client, + session, + files_to_download, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=enable_summary, + on_heartbeat=on_heartbeat, + ) + + return renamed_count + batch_indexed, skipped, errors + + +async def index_dropbox_files( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, +) -> tuple[int, int, str | None]: + """Index Dropbox files for a specific connector. + + items_dict format: + { + "folders": [{"path": "...", "name": "..."}, ...], + "files": [{"path": "...", "name": "..."}, ...], + "indexing_options": {"max_files": 500, "include_subfolders": true} + } + """ + task_logger = TaskLoggingService(session, search_space_id) + log_entry = await task_logger.log_task_start( + task_name="dropbox_files_indexing", + source="connector_indexing_task", + message=f"Starting Dropbox indexing for connector {connector_id}", + metadata={"connector_id": connector_id, "user_id": str(user_id)}, + ) + + try: + connector = await get_connector_by_id( + session, connector_id, SearchSourceConnectorType.DROPBOX_CONNECTOR + ) + if not connector: + error_msg = f"Dropbox connector with ID {connector_id} not found" + await task_logger.log_task_failure( + log_entry, error_msg, None, {"error_type": "ConnectorNotFound"} + ) + return 0, 0, error_msg + + token_encrypted = connector.config.get("_token_encrypted", False) + if token_encrypted and not config.SECRET_KEY: + error_msg = "SECRET_KEY not configured but credentials are encrypted" + await task_logger.log_task_failure( + log_entry, + error_msg, + "Missing SECRET_KEY", + {"error_type": "MissingSecretKey"}, + ) + return 0, 0, error_msg + + connector_enable_summary = getattr(connector, "enable_summary", True) + dropbox_client = DropboxClient(session, connector_id) + + indexing_options = items_dict.get("indexing_options", {}) + max_files = indexing_options.get("max_files", 500) + include_subfolders = indexing_options.get("include_subfolders", True) + + total_indexed = 0 + total_skipped = 0 + + selected_files = items_dict.get("files", []) + if selected_files: + file_tuples = [ + (f.get("path", f.get("path_lower", "")), f.get("name")) + for f in selected_files + ] + indexed, skipped, _errors = await _index_selected_files( + dropbox_client, + session, + file_tuples, + connector_id=connector_id, + search_space_id=search_space_id, + user_id=user_id, + enable_summary=connector_enable_summary, + ) + total_indexed += indexed + total_skipped += skipped + + folders = items_dict.get("folders", []) + for folder in folders: + folder_path = folder.get("path", folder.get("path_lower", "")) + folder_name = folder.get("name", "Root") + + logger.info(f"Using full scan for folder {folder_name}") + indexed, skipped = await _index_full_scan( + dropbox_client, + session, + connector_id, + search_space_id, + user_id, + folder_path, + folder_name, + task_logger, + log_entry, + max_files, + include_subfolders, + enable_summary=connector_enable_summary, + ) + total_indexed += indexed + total_skipped += skipped + + if total_indexed > 0 or folders: + await update_connector_last_indexed(session, connector, True) + + await session.commit() + + await task_logger.log_task_success( + log_entry, + f"Successfully completed Dropbox indexing for connector {connector_id}", + {"files_processed": total_indexed, "files_skipped": total_skipped}, + ) + logger.info( + f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped" + ) + return total_indexed, total_skipped, None + + except SQLAlchemyError as db_error: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Database error during Dropbox indexing for connector {connector_id}", + str(db_error), + {"error_type": "SQLAlchemyError"}, + ) + logger.error(f"Database error: {db_error!s}", exc_info=True) + return 0, 0, f"Database error: {db_error!s}" + except Exception as e: + await session.rollback() + await task_logger.log_task_failure( + log_entry, + f"Failed to index Dropbox files for connector {connector_id}", + str(e), + {"error_type": type(e).__name__}, + ) + logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True) + return 0, 0, f"Failed to index Dropbox files: {e!s}"