diff --git a/surfsense_backend/app/connectors/dropbox/content_extractor.py b/surfsense_backend/app/connectors/dropbox/content_extractor.py new file mode 100644 index 000000000..9e83f3474 --- /dev/null +++ b/surfsense_backend/app/connectors/dropbox/content_extractor.py @@ -0,0 +1,74 @@ +"""Content extraction for Dropbox files. + +Reuses the same ETL parsing logic as OneDrive/Google Drive since file parsing +is extension-based, not provider-specific. +""" + +import contextlib +import logging +import os +import tempfile +from pathlib import Path +from typing import Any + +from .client import DropboxClient +from .file_types import get_extension_from_name, should_skip_file + +logger = logging.getLogger(__name__) + + +async def download_and_extract_content( + client: DropboxClient, + file: dict[str, Any], +) -> tuple[str | None, dict[str, Any], str | None]: + """Download a Dropbox file and extract its content as markdown. + + Returns (markdown_content, dropbox_metadata, error_message). + """ + file_path_lower = file.get("path_lower", "") + file_name = file.get("name", "Unknown") + file_id = file.get("id", "") + + if should_skip_file(file): + return None, {}, "Skipping non-indexable item" + + logger.info(f"Downloading file for content extraction: {file_name}") + + metadata: dict[str, Any] = { + "dropbox_file_id": file_id, + "dropbox_file_name": file_name, + "dropbox_path": file_path_lower, + "source_connector": "dropbox", + } + + if "server_modified" in file: + metadata["modified_time"] = file["server_modified"] + if "client_modified" in file: + metadata["created_time"] = file["client_modified"] + if "size" in file: + metadata["file_size"] = file["size"] + if "content_hash" in file: + metadata["content_hash"] = file["content_hash"] + + temp_file_path = None + try: + extension = get_extension_from_name(file_name) or ".bin" + with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp: + temp_file_path = tmp.name + + error = await client.download_file_to_disk(file_path_lower, temp_file_path) + if error: + return None, metadata, error + + from app.connectors.onedrive.content_extractor import _parse_file_to_markdown + + markdown = await _parse_file_to_markdown(temp_file_path, file_name) + return markdown, metadata, None + + except Exception as e: + logger.warning(f"Failed to extract content from {file_name}: {e!s}") + return None, metadata, str(e) + finally: + if temp_file_path and os.path.exists(temp_file_path): + with contextlib.suppress(Exception): + os.unlink(temp_file_path) diff --git a/surfsense_backend/app/routes/dropbox_add_connector_route.py b/surfsense_backend/app/routes/dropbox_add_connector_route.py new file mode 100644 index 000000000..8dcaf8c1c --- /dev/null +++ b/surfsense_backend/app/routes/dropbox_add_connector_route.py @@ -0,0 +1,569 @@ +""" +Dropbox Connector OAuth Routes. + +Endpoints: +- GET /auth/dropbox/connector/add - Initiate OAuth +- GET /auth/dropbox/connector/callback - Handle OAuth callback +- GET /auth/dropbox/connector/reauth - Re-authenticate existing connector +- GET /connectors/{connector_id}/dropbox/folders - List folder contents +""" + +import logging +from datetime import UTC, datetime, timedelta +from urllib.parse import urlencode +from uuid import UUID + +import httpx +from fastapi import APIRouter, Depends, HTTPException +from fastapi.responses import RedirectResponse +from sqlalchemy.exc import IntegrityError +from sqlalchemy.ext.asyncio import AsyncSession +from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified + +from app.config import config +from app.connectors.dropbox import DropboxClient, list_folder_contents +from app.db import ( + SearchSourceConnector, + SearchSourceConnectorType, + User, + get_async_session, +) +from app.users import current_active_user +from app.utils.connector_naming import ( + check_duplicate_connector, + extract_identifier_from_credentials, + generate_unique_connector_name, +) +from app.utils.oauth_security import OAuthStateManager, TokenEncryption + +logger = logging.getLogger(__name__) +router = APIRouter() + +AUTHORIZATION_URL = "https://www.dropbox.com/oauth2/authorize" +TOKEN_URL = "https://api.dropboxapi.com/oauth2/token" + +_state_manager = None +_token_encryption = None + + +def get_state_manager() -> OAuthStateManager: + global _state_manager + if _state_manager is None: + if not config.SECRET_KEY: + raise ValueError("SECRET_KEY must be set for OAuth security") + _state_manager = OAuthStateManager(config.SECRET_KEY) + return _state_manager + + +def get_token_encryption() -> TokenEncryption: + global _token_encryption + if _token_encryption is None: + if not config.SECRET_KEY: + raise ValueError("SECRET_KEY must be set for token encryption") + _token_encryption = TokenEncryption(config.SECRET_KEY) + return _token_encryption + + +@router.get("/auth/dropbox/connector/add") +async def connect_dropbox(space_id: int, user: User = Depends(current_active_user)): + """Initiate Dropbox OAuth flow.""" + try: + if not space_id: + raise HTTPException(status_code=400, detail="space_id is required") + if not config.DROPBOX_APP_KEY: + raise HTTPException( + status_code=500, detail="Dropbox OAuth not configured." + ) + if not config.SECRET_KEY: + raise HTTPException( + status_code=500, detail="SECRET_KEY not configured for OAuth security." + ) + + state_manager = get_state_manager() + state_encoded = state_manager.generate_secure_state(space_id, user.id) + + auth_params = { + "client_id": config.DROPBOX_APP_KEY, + "response_type": "code", + "redirect_uri": config.DROPBOX_REDIRECT_URI, + "state": state_encoded, + "token_access_type": "offline", + } + auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}" + + logger.info( + "Generated Dropbox OAuth URL for user %s, space %s", user.id, space_id + ) + return {"auth_url": auth_url} + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to initiate Dropbox OAuth: %s", str(e), exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to initiate Dropbox OAuth: {e!s}" + ) from e + + +@router.get("/auth/dropbox/connector/reauth") +async def reauth_dropbox( + space_id: int, + connector_id: int, + return_url: str | None = None, + user: User = Depends(current_active_user), + session: AsyncSession = Depends(get_async_session), +): + """Re-authenticate an existing Dropbox connector.""" + try: + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DROPBOX_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + raise HTTPException( + status_code=404, detail="Dropbox connector not found or access denied" + ) + + if not config.SECRET_KEY: + raise HTTPException( + status_code=500, detail="SECRET_KEY not configured for OAuth security." + ) + + state_manager = get_state_manager() + extra: dict = {"connector_id": connector_id} + if return_url and return_url.startswith("/"): + extra["return_url"] = return_url + state_encoded = state_manager.generate_secure_state(space_id, user.id, **extra) + + auth_params = { + "client_id": config.DROPBOX_APP_KEY, + "response_type": "code", + "redirect_uri": config.DROPBOX_REDIRECT_URI, + "state": state_encoded, + "token_access_type": "offline", + "force_reapprove": "true", + } + auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}" + + logger.info( + "Initiating Dropbox re-auth for user %s, connector %s", + user.id, + connector_id, + ) + return {"auth_url": auth_url} + + except HTTPException: + raise + except Exception as e: + logger.error("Failed to initiate Dropbox re-auth: %s", str(e), exc_info=True) + raise HTTPException( + status_code=500, detail=f"Failed to initiate Dropbox re-auth: {e!s}" + ) from e + + +@router.get("/auth/dropbox/connector/callback") +async def dropbox_callback( + code: str | None = None, + error: str | None = None, + error_description: str | None = None, + state: str | None = None, + session: AsyncSession = Depends(get_async_session), +): + """Handle Dropbox OAuth callback.""" + try: + if error: + error_msg = error_description or error + logger.warning("Dropbox OAuth error: %s", error_msg) + space_id = None + if state: + try: + data = get_state_manager().validate_state(state) + space_id = data.get("space_id") + except Exception: + pass + if space_id: + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=dropbox_oauth_denied" + ) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_oauth_denied" + ) + + if not code or not state: + raise HTTPException( + status_code=400, detail="Missing required OAuth parameters" + ) + + state_manager = get_state_manager() + try: + data = state_manager.validate_state(state) + space_id = data["space_id"] + user_id = UUID(data["user_id"]) + except (HTTPException, ValueError, KeyError) as e: + logger.error("Invalid OAuth state: %s", str(e)) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state" + ) + + reauth_connector_id = data.get("connector_id") + reauth_return_url = data.get("return_url") + + token_data = { + "client_id": config.DROPBOX_APP_KEY, + "client_secret": config.DROPBOX_APP_SECRET, + "code": code, + "redirect_uri": config.DROPBOX_REDIRECT_URI, + "grant_type": "authorization_code", + } + + async with httpx.AsyncClient() as client: + token_response = await client.post( + TOKEN_URL, + data=token_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + + if token_response.status_code != 200: + error_detail = token_response.text + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + except Exception: + pass + raise HTTPException( + status_code=400, detail=f"Token exchange failed: {error_detail}" + ) + + token_json = token_response.json() + access_token = token_json.get("access_token") + refresh_token = token_json.get("refresh_token") + + if not access_token: + raise HTTPException( + status_code=400, detail="No access token received from Dropbox" + ) + + token_encryption = get_token_encryption() + + expires_at = None + if token_json.get("expires_in"): + expires_at = datetime.now(UTC) + timedelta( + seconds=int(token_json["expires_in"]) + ) + + user_info: dict = {} + try: + async with httpx.AsyncClient() as client: + user_response = await client.post( + "https://api.dropboxapi.com/2/users/get_current_account", + headers={ + "Authorization": f"Bearer {access_token}", + "Content-Type": "application/json", + }, + content=b"null", + timeout=30.0, + ) + if user_response.status_code == 200: + user_data = user_response.json() + user_info = { + "user_email": user_data.get("email"), + "user_name": user_data.get("name", {}).get("display_name"), + "account_id": user_data.get("account_id"), + } + except Exception as e: + logger.warning("Failed to fetch user info from Dropbox: %s", str(e)) + + connector_config = { + "access_token": token_encryption.encrypt_token(access_token), + "refresh_token": token_encryption.encrypt_token(refresh_token) + if refresh_token + else None, + "token_type": token_json.get("token_type", "bearer"), + "expires_in": token_json.get("expires_in"), + "expires_at": expires_at.isoformat() if expires_at else None, + "user_email": user_info.get("user_email"), + "user_name": user_info.get("user_name"), + "account_id": user_info.get("account_id"), + "_token_encrypted": True, + } + + if reauth_connector_id: + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == reauth_connector_id, + SearchSourceConnector.user_id == user_id, + SearchSourceConnector.search_space_id == space_id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DROPBOX_CONNECTOR, + ) + ) + db_connector = result.scalars().first() + if not db_connector: + raise HTTPException( + status_code=404, + detail="Connector not found or access denied during re-auth", + ) + + existing_cursor = db_connector.config.get("cursor") + db_connector.config = { + **connector_config, + "cursor": existing_cursor, + "auth_expired": False, + } + flag_modified(db_connector, "config") + await session.commit() + await session.refresh(db_connector) + + logger.info( + "Re-authenticated Dropbox connector %s for user %s", + db_connector.id, + user_id, + ) + if reauth_return_url and reauth_return_url.startswith("/"): + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}" + ) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={db_connector.id}" + ) + + connector_identifier = extract_identifier_from_credentials( + SearchSourceConnectorType.DROPBOX_CONNECTOR, connector_config + ) + is_duplicate = await check_duplicate_connector( + session, + SearchSourceConnectorType.DROPBOX_CONNECTOR, + space_id, + user_id, + connector_identifier, + ) + if is_duplicate: + logger.warning( + "Duplicate Dropbox connector for user %s, space %s", user_id, space_id + ) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=DROPBOX_CONNECTOR" + ) + + connector_name = await generate_unique_connector_name( + session, + SearchSourceConnectorType.DROPBOX_CONNECTOR, + space_id, + user_id, + connector_identifier, + ) + + new_connector = SearchSourceConnector( + name=connector_name, + connector_type=SearchSourceConnectorType.DROPBOX_CONNECTOR, + is_indexable=True, + config=connector_config, + search_space_id=space_id, + user_id=user_id, + ) + + try: + session.add(new_connector) + await session.commit() + await session.refresh(new_connector) + logger.info( + "Successfully created Dropbox connector %s for user %s", + new_connector.id, + user_id, + ) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={new_connector.id}" + ) + except IntegrityError as e: + await session.rollback() + logger.error( + "Database integrity error creating Dropbox connector: %s", str(e) + ) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed" + ) + + except HTTPException: + raise + except (IntegrityError, ValueError) as e: + logger.error("Dropbox OAuth callback error: %s", str(e), exc_info=True) + return RedirectResponse( + url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_auth_error" + ) + + +@router.get("/connectors/{connector_id}/dropbox/folders") +async def list_dropbox_folders( + connector_id: int, + parent_path: str = "", + session: AsyncSession = Depends(get_async_session), + user: User = Depends(current_active_user), +): + """List folders and files in user's Dropbox.""" + connector = None + try: + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.id == connector_id, + SearchSourceConnector.user_id == user.id, + SearchSourceConnector.connector_type + == SearchSourceConnectorType.DROPBOX_CONNECTOR, + ) + ) + connector = result.scalars().first() + if not connector: + raise HTTPException( + status_code=404, detail="Dropbox connector not found or access denied" + ) + + dropbox_client = DropboxClient(session, connector_id) + items, error = await list_folder_contents(dropbox_client, path=parent_path) + + if error: + error_lower = error.lower() + if ( + "401" in error + or "authentication expired" in error_lower + or "expired_access_token" in error_lower + ): + try: + if connector and not connector.config.get("auth_expired"): + connector.config = {**connector.config, "auth_expired": True} + flag_modified(connector, "config") + await session.commit() + except Exception: + logger.warning( + "Failed to persist auth_expired for connector %s", + connector_id, + exc_info=True, + ) + raise HTTPException( + status_code=400, + detail="Dropbox authentication expired. Please re-authenticate.", + ) + raise HTTPException( + status_code=500, detail=f"Failed to list folder contents: {error}" + ) + + return {"items": items} + + except HTTPException: + raise + except Exception as e: + logger.error("Error listing Dropbox contents: %s", str(e), exc_info=True) + error_lower = str(e).lower() + if "401" in str(e) or "authentication expired" in error_lower: + try: + if connector and not connector.config.get("auth_expired"): + connector.config = {**connector.config, "auth_expired": True} + flag_modified(connector, "config") + await session.commit() + except Exception: + pass + raise HTTPException( + status_code=400, + detail="Dropbox authentication expired. Please re-authenticate.", + ) from e + raise HTTPException( + status_code=500, detail=f"Failed to list Dropbox contents: {e!s}" + ) from e + + +async def refresh_dropbox_token( + session: AsyncSession, connector: SearchSourceConnector +) -> SearchSourceConnector: + """Refresh Dropbox OAuth tokens.""" + logger.info("Refreshing Dropbox OAuth tokens for connector %s", connector.id) + + token_encryption = get_token_encryption() + is_encrypted = connector.config.get("_token_encrypted", False) + refresh_token = connector.config.get("refresh_token") + + if is_encrypted and refresh_token: + try: + refresh_token = token_encryption.decrypt_token(refresh_token) + except Exception as e: + logger.error("Failed to decrypt refresh token: %s", str(e)) + raise HTTPException( + status_code=500, detail="Failed to decrypt stored refresh token" + ) from e + + if not refresh_token: + raise HTTPException( + status_code=400, + detail=f"No refresh token available for connector {connector.id}", + ) + + refresh_data = { + "client_id": config.DROPBOX_APP_KEY, + "client_secret": config.DROPBOX_APP_SECRET, + "grant_type": "refresh_token", + "refresh_token": refresh_token, + } + + async with httpx.AsyncClient() as client: + token_response = await client.post( + TOKEN_URL, + data=refresh_data, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + timeout=30.0, + ) + + if token_response.status_code != 200: + error_detail = token_response.text + error_code = "" + try: + error_json = token_response.json() + error_detail = error_json.get("error_description", error_detail) + error_code = error_json.get("error", "") + except Exception: + pass + error_lower = (error_detail + error_code).lower() + if ( + "invalid_grant" in error_lower + or "expired" in error_lower + or "revoked" in error_lower + ): + raise HTTPException( + status_code=401, + detail="Dropbox authentication failed. Please re-authenticate.", + ) + raise HTTPException( + status_code=400, detail=f"Token refresh failed: {error_detail}" + ) + + token_json = token_response.json() + access_token = token_json.get("access_token") + + if not access_token: + raise HTTPException( + status_code=400, detail="No access token received from Dropbox refresh" + ) + + expires_at = None + expires_in = token_json.get("expires_in") + if expires_in: + expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in)) + + cfg = dict(connector.config) + cfg["access_token"] = token_encryption.encrypt_token(access_token) + cfg["expires_in"] = expires_in + cfg["expires_at"] = expires_at.isoformat() if expires_at else None + cfg["_token_encrypted"] = True + cfg.pop("auth_expired", None) + + connector.config = cfg + flag_modified(connector, "config") + await session.commit() + await session.refresh(connector) + + logger.info("Successfully refreshed Dropbox tokens for connector %s", connector.id) + return connector diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d12fa3745..3b2a8f210 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1046,6 +1046,52 @@ async def index_connector_content( ) response_message = "OneDrive indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DROPBOX_CONNECTOR: + from app.tasks.celery_tasks.connector_tasks import ( + index_dropbox_files_task, + ) + + if drive_items and drive_items.has_items(): + logger.info( + f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id}, " + f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}" + ) + items_dict = drive_items.model_dump() + else: + config = connector.config or {} + selected_folders = config.get("selected_folders", []) + selected_files = config.get("selected_files", []) + if not selected_folders and not selected_files: + raise HTTPException( + status_code=400, + detail="Dropbox indexing requires folders or files to be configured. " + "Please select folders/files to index.", + ) + indexing_options = config.get( + "indexing_options", + { + "max_files_per_folder": 100, + "include_subfolders": True, + }, + ) + items_dict = { + "folders": selected_folders, + "files": selected_files, + "indexing_options": indexing_options, + } + logger.info( + f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id} " + f"using existing config" + ) + + index_dropbox_files_task.delay( + connector_id, + search_space_id, + str(user.id), + items_dict, + ) + response_message = "Dropbox indexing started in the background." + elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR: from app.tasks.celery_tasks.connector_tasks import ( index_discord_messages_task, @@ -2644,6 +2690,114 @@ async def run_onedrive_indexing( logger.error(f"Failed to update notification: {notif_error!s}") +async def run_dropbox_indexing( + session: AsyncSession, + connector_id: int, + search_space_id: int, + user_id: str, + items_dict: dict, +): + """Runs the Dropbox indexing task for folders and files with notifications.""" + from uuid import UUID + + notification = None + try: + from app.tasks.connector_indexers.dropbox_indexer import index_dropbox_files + + connector_result = await session.execute( + select(SearchSourceConnector).where( + SearchSourceConnector.id == connector_id + ) + ) + connector = connector_result.scalar_one_or_none() + + if connector: + notification = await NotificationService.connector_indexing.notify_google_drive_indexing_started( + session=session, + user_id=UUID(user_id), + connector_id=connector_id, + connector_name=connector.name, + connector_type=connector.connector_type.value, + search_space_id=search_space_id, + folder_count=len(items_dict.get("folders", [])), + file_count=len(items_dict.get("files", [])), + folder_names=[ + f.get("name", "Unknown") for f in items_dict.get("folders", []) + ], + file_names=[ + f.get("name", "Unknown") for f in items_dict.get("files", []) + ], + ) + + if notification: + await NotificationService.connector_indexing.notify_indexing_progress( + session=session, + notification=notification, + indexed_count=0, + stage="fetching", + ) + + total_indexed, total_skipped, error_message = await index_dropbox_files( + session, + connector_id, + search_space_id, + user_id, + items_dict, + ) + + if error_message: + logger.error( + f"Dropbox indexing completed with errors for connector {connector_id}: {error_message}" + ) + if _is_auth_error(error_message): + await _persist_auth_expired(session, connector_id) + error_message = ( + "Dropbox authentication expired. Please re-authenticate." + ) + else: + if notification: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_progress( + session=session, + notification=notification, + indexed_count=total_indexed, + stage="storing", + ) + + logger.info( + f"Dropbox indexing successful for connector {connector_id}. Indexed {total_indexed} documents." + ) + await _update_connector_timestamp_by_id(session, connector_id) + await session.commit() + + if notification: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=total_indexed, + error_message=error_message, + skipped_count=total_skipped, + ) + + except Exception as e: + logger.error( + f"Critical error in run_dropbox_indexing for connector {connector_id}: {e}", + exc_info=True, + ) + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=0, + error_message=str(e), + ) + except Exception as notif_error: + logger.error(f"Failed to update notification: {notif_error!s}") + + # Add new helper functions for luma indexing async def run_luma_indexing_with_new_session( connector_id: int,