feat: add Dropbox content extraction and OAuth routes for improved file indexing and integration

This commit is contained in:
Anish Sarkar 2026-03-30 22:18:55 +05:30
parent 1f12151e03
commit af115ddae4
3 changed files with 797 additions and 0 deletions

View file

@ -0,0 +1,74 @@
"""Content extraction for Dropbox files.
Reuses the same ETL parsing logic as OneDrive/Google Drive since file parsing
is extension-based, not provider-specific.
"""
import contextlib
import logging
import os
import tempfile
from pathlib import Path
from typing import Any
from .client import DropboxClient
from .file_types import get_extension_from_name, should_skip_file
logger = logging.getLogger(__name__)
async def download_and_extract_content(
client: DropboxClient,
file: dict[str, Any],
) -> tuple[str | None, dict[str, Any], str | None]:
"""Download a Dropbox file and extract its content as markdown.
Returns (markdown_content, dropbox_metadata, error_message).
"""
file_path_lower = file.get("path_lower", "")
file_name = file.get("name", "Unknown")
file_id = file.get("id", "")
if should_skip_file(file):
return None, {}, "Skipping non-indexable item"
logger.info(f"Downloading file for content extraction: {file_name}")
metadata: dict[str, Any] = {
"dropbox_file_id": file_id,
"dropbox_file_name": file_name,
"dropbox_path": file_path_lower,
"source_connector": "dropbox",
}
if "server_modified" in file:
metadata["modified_time"] = file["server_modified"]
if "client_modified" in file:
metadata["created_time"] = file["client_modified"]
if "size" in file:
metadata["file_size"] = file["size"]
if "content_hash" in file:
metadata["content_hash"] = file["content_hash"]
temp_file_path = None
try:
extension = get_extension_from_name(file_name) or ".bin"
with tempfile.NamedTemporaryFile(delete=False, suffix=extension) as tmp:
temp_file_path = tmp.name
error = await client.download_file_to_disk(file_path_lower, temp_file_path)
if error:
return None, metadata, error
from app.connectors.onedrive.content_extractor import _parse_file_to_markdown
markdown = await _parse_file_to_markdown(temp_file_path, file_name)
return markdown, metadata, None
except Exception as e:
logger.warning(f"Failed to extract content from {file_name}: {e!s}")
return None, metadata, str(e)
finally:
if temp_file_path and os.path.exists(temp_file_path):
with contextlib.suppress(Exception):
os.unlink(temp_file_path)

View file

@ -0,0 +1,569 @@
"""
Dropbox Connector OAuth Routes.
Endpoints:
- GET /auth/dropbox/connector/add - Initiate OAuth
- GET /auth/dropbox/connector/callback - Handle OAuth callback
- GET /auth/dropbox/connector/reauth - Re-authenticate existing connector
- GET /connectors/{connector_id}/dropbox/folders - List folder contents
"""
import logging
from datetime import UTC, datetime, timedelta
from urllib.parse import urlencode
from uuid import UUID
import httpx
from fastapi import APIRouter, Depends, HTTPException
from fastapi.responses import RedirectResponse
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.connectors.dropbox import DropboxClient, list_folder_contents
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.users import current_active_user
from app.utils.connector_naming import (
check_duplicate_connector,
extract_identifier_from_credentials,
generate_unique_connector_name,
)
from app.utils.oauth_security import OAuthStateManager, TokenEncryption
logger = logging.getLogger(__name__)
router = APIRouter()
AUTHORIZATION_URL = "https://www.dropbox.com/oauth2/authorize"
TOKEN_URL = "https://api.dropboxapi.com/oauth2/token"
_state_manager = None
_token_encryption = None
def get_state_manager() -> OAuthStateManager:
global _state_manager
if _state_manager is None:
if not config.SECRET_KEY:
raise ValueError("SECRET_KEY must be set for OAuth security")
_state_manager = OAuthStateManager(config.SECRET_KEY)
return _state_manager
def get_token_encryption() -> TokenEncryption:
global _token_encryption
if _token_encryption is None:
if not config.SECRET_KEY:
raise ValueError("SECRET_KEY must be set for token encryption")
_token_encryption = TokenEncryption(config.SECRET_KEY)
return _token_encryption
@router.get("/auth/dropbox/connector/add")
async def connect_dropbox(space_id: int, user: User = Depends(current_active_user)):
"""Initiate Dropbox OAuth flow."""
try:
if not space_id:
raise HTTPException(status_code=400, detail="space_id is required")
if not config.DROPBOX_APP_KEY:
raise HTTPException(
status_code=500, detail="Dropbox OAuth not configured."
)
if not config.SECRET_KEY:
raise HTTPException(
status_code=500, detail="SECRET_KEY not configured for OAuth security."
)
state_manager = get_state_manager()
state_encoded = state_manager.generate_secure_state(space_id, user.id)
auth_params = {
"client_id": config.DROPBOX_APP_KEY,
"response_type": "code",
"redirect_uri": config.DROPBOX_REDIRECT_URI,
"state": state_encoded,
"token_access_type": "offline",
}
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
logger.info(
"Generated Dropbox OAuth URL for user %s, space %s", user.id, space_id
)
return {"auth_url": auth_url}
except HTTPException:
raise
except Exception as e:
logger.error("Failed to initiate Dropbox OAuth: %s", str(e), exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to initiate Dropbox OAuth: {e!s}"
) from e
@router.get("/auth/dropbox/connector/reauth")
async def reauth_dropbox(
space_id: int,
connector_id: int,
return_url: str | None = None,
user: User = Depends(current_active_user),
session: AsyncSession = Depends(get_async_session),
):
"""Re-authenticate an existing Dropbox connector."""
try:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404, detail="Dropbox connector not found or access denied"
)
if not config.SECRET_KEY:
raise HTTPException(
status_code=500, detail="SECRET_KEY not configured for OAuth security."
)
state_manager = get_state_manager()
extra: dict = {"connector_id": connector_id}
if return_url and return_url.startswith("/"):
extra["return_url"] = return_url
state_encoded = state_manager.generate_secure_state(space_id, user.id, **extra)
auth_params = {
"client_id": config.DROPBOX_APP_KEY,
"response_type": "code",
"redirect_uri": config.DROPBOX_REDIRECT_URI,
"state": state_encoded,
"token_access_type": "offline",
"force_reapprove": "true",
}
auth_url = f"{AUTHORIZATION_URL}?{urlencode(auth_params)}"
logger.info(
"Initiating Dropbox re-auth for user %s, connector %s",
user.id,
connector_id,
)
return {"auth_url": auth_url}
except HTTPException:
raise
except Exception as e:
logger.error("Failed to initiate Dropbox re-auth: %s", str(e), exc_info=True)
raise HTTPException(
status_code=500, detail=f"Failed to initiate Dropbox re-auth: {e!s}"
) from e
@router.get("/auth/dropbox/connector/callback")
async def dropbox_callback(
code: str | None = None,
error: str | None = None,
error_description: str | None = None,
state: str | None = None,
session: AsyncSession = Depends(get_async_session),
):
"""Handle Dropbox OAuth callback."""
try:
if error:
error_msg = error_description or error
logger.warning("Dropbox OAuth error: %s", error_msg)
space_id = None
if state:
try:
data = get_state_manager().validate_state(state)
space_id = data.get("space_id")
except Exception:
pass
if space_id:
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=dropbox_oauth_denied"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_oauth_denied"
)
if not code or not state:
raise HTTPException(
status_code=400, detail="Missing required OAuth parameters"
)
state_manager = get_state_manager()
try:
data = state_manager.validate_state(state)
space_id = data["space_id"]
user_id = UUID(data["user_id"])
except (HTTPException, ValueError, KeyError) as e:
logger.error("Invalid OAuth state: %s", str(e))
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=invalid_state"
)
reauth_connector_id = data.get("connector_id")
reauth_return_url = data.get("return_url")
token_data = {
"client_id": config.DROPBOX_APP_KEY,
"client_secret": config.DROPBOX_APP_SECRET,
"code": code,
"redirect_uri": config.DROPBOX_REDIRECT_URI,
"grant_type": "authorization_code",
}
async with httpx.AsyncClient() as client:
token_response = await client.post(
TOKEN_URL,
data=token_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30.0,
)
if token_response.status_code != 200:
error_detail = token_response.text
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
except Exception:
pass
raise HTTPException(
status_code=400, detail=f"Token exchange failed: {error_detail}"
)
token_json = token_response.json()
access_token = token_json.get("access_token")
refresh_token = token_json.get("refresh_token")
if not access_token:
raise HTTPException(
status_code=400, detail="No access token received from Dropbox"
)
token_encryption = get_token_encryption()
expires_at = None
if token_json.get("expires_in"):
expires_at = datetime.now(UTC) + timedelta(
seconds=int(token_json["expires_in"])
)
user_info: dict = {}
try:
async with httpx.AsyncClient() as client:
user_response = await client.post(
"https://api.dropboxapi.com/2/users/get_current_account",
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
content=b"null",
timeout=30.0,
)
if user_response.status_code == 200:
user_data = user_response.json()
user_info = {
"user_email": user_data.get("email"),
"user_name": user_data.get("name", {}).get("display_name"),
"account_id": user_data.get("account_id"),
}
except Exception as e:
logger.warning("Failed to fetch user info from Dropbox: %s", str(e))
connector_config = {
"access_token": token_encryption.encrypt_token(access_token),
"refresh_token": token_encryption.encrypt_token(refresh_token)
if refresh_token
else None,
"token_type": token_json.get("token_type", "bearer"),
"expires_in": token_json.get("expires_in"),
"expires_at": expires_at.isoformat() if expires_at else None,
"user_email": user_info.get("user_email"),
"user_name": user_info.get("user_name"),
"account_id": user_info.get("account_id"),
"_token_encrypted": True,
}
if reauth_connector_id:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == reauth_connector_id,
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.search_space_id == space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
)
)
db_connector = result.scalars().first()
if not db_connector:
raise HTTPException(
status_code=404,
detail="Connector not found or access denied during re-auth",
)
existing_cursor = db_connector.config.get("cursor")
db_connector.config = {
**connector_config,
"cursor": existing_cursor,
"auth_expired": False,
}
flag_modified(db_connector, "config")
await session.commit()
await session.refresh(db_connector)
logger.info(
"Re-authenticated Dropbox connector %s for user %s",
db_connector.id,
user_id,
)
if reauth_return_url and reauth_return_url.startswith("/"):
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}{reauth_return_url}"
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={db_connector.id}"
)
connector_identifier = extract_identifier_from_credentials(
SearchSourceConnectorType.DROPBOX_CONNECTOR, connector_config
)
is_duplicate = await check_duplicate_connector(
session,
SearchSourceConnectorType.DROPBOX_CONNECTOR,
space_id,
user_id,
connector_identifier,
)
if is_duplicate:
logger.warning(
"Duplicate Dropbox connector for user %s, space %s", user_id, space_id
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?error=duplicate_account&connector=DROPBOX_CONNECTOR"
)
connector_name = await generate_unique_connector_name(
session,
SearchSourceConnectorType.DROPBOX_CONNECTOR,
space_id,
user_id,
connector_identifier,
)
new_connector = SearchSourceConnector(
name=connector_name,
connector_type=SearchSourceConnectorType.DROPBOX_CONNECTOR,
is_indexable=True,
config=connector_config,
search_space_id=space_id,
user_id=user_id,
)
try:
session.add(new_connector)
await session.commit()
await session.refresh(new_connector)
logger.info(
"Successfully created Dropbox connector %s for user %s",
new_connector.id,
user_id,
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard/{space_id}/connectors/callback?success=true&connector=DROPBOX_CONNECTOR&connectorId={new_connector.id}"
)
except IntegrityError as e:
await session.rollback()
logger.error(
"Database integrity error creating Dropbox connector: %s", str(e)
)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=connector_creation_failed"
)
except HTTPException:
raise
except (IntegrityError, ValueError) as e:
logger.error("Dropbox OAuth callback error: %s", str(e), exc_info=True)
return RedirectResponse(
url=f"{config.NEXT_FRONTEND_URL}/dashboard?error=dropbox_auth_error"
)
@router.get("/connectors/{connector_id}/dropbox/folders")
async def list_dropbox_folders(
connector_id: int,
parent_path: str = "",
session: AsyncSession = Depends(get_async_session),
user: User = Depends(current_active_user),
):
"""List folders and files in user's Dropbox."""
connector = None
try:
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id,
SearchSourceConnector.user_id == user.id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.DROPBOX_CONNECTOR,
)
)
connector = result.scalars().first()
if not connector:
raise HTTPException(
status_code=404, detail="Dropbox connector not found or access denied"
)
dropbox_client = DropboxClient(session, connector_id)
items, error = await list_folder_contents(dropbox_client, path=parent_path)
if error:
error_lower = error.lower()
if (
"401" in error
or "authentication expired" in error_lower
or "expired_access_token" in error_lower
):
try:
if connector and not connector.config.get("auth_expired"):
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await session.commit()
except Exception:
logger.warning(
"Failed to persist auth_expired for connector %s",
connector_id,
exc_info=True,
)
raise HTTPException(
status_code=400,
detail="Dropbox authentication expired. Please re-authenticate.",
)
raise HTTPException(
status_code=500, detail=f"Failed to list folder contents: {error}"
)
return {"items": items}
except HTTPException:
raise
except Exception as e:
logger.error("Error listing Dropbox contents: %s", str(e), exc_info=True)
error_lower = str(e).lower()
if "401" in str(e) or "authentication expired" in error_lower:
try:
if connector and not connector.config.get("auth_expired"):
connector.config = {**connector.config, "auth_expired": True}
flag_modified(connector, "config")
await session.commit()
except Exception:
pass
raise HTTPException(
status_code=400,
detail="Dropbox authentication expired. Please re-authenticate.",
) from e
raise HTTPException(
status_code=500, detail=f"Failed to list Dropbox contents: {e!s}"
) from e
async def refresh_dropbox_token(
session: AsyncSession, connector: SearchSourceConnector
) -> SearchSourceConnector:
"""Refresh Dropbox OAuth tokens."""
logger.info("Refreshing Dropbox OAuth tokens for connector %s", connector.id)
token_encryption = get_token_encryption()
is_encrypted = connector.config.get("_token_encrypted", False)
refresh_token = connector.config.get("refresh_token")
if is_encrypted and refresh_token:
try:
refresh_token = token_encryption.decrypt_token(refresh_token)
except Exception as e:
logger.error("Failed to decrypt refresh token: %s", str(e))
raise HTTPException(
status_code=500, detail="Failed to decrypt stored refresh token"
) from e
if not refresh_token:
raise HTTPException(
status_code=400,
detail=f"No refresh token available for connector {connector.id}",
)
refresh_data = {
"client_id": config.DROPBOX_APP_KEY,
"client_secret": config.DROPBOX_APP_SECRET,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
async with httpx.AsyncClient() as client:
token_response = await client.post(
TOKEN_URL,
data=refresh_data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30.0,
)
if token_response.status_code != 200:
error_detail = token_response.text
error_code = ""
try:
error_json = token_response.json()
error_detail = error_json.get("error_description", error_detail)
error_code = error_json.get("error", "")
except Exception:
pass
error_lower = (error_detail + error_code).lower()
if (
"invalid_grant" in error_lower
or "expired" in error_lower
or "revoked" in error_lower
):
raise HTTPException(
status_code=401,
detail="Dropbox authentication failed. Please re-authenticate.",
)
raise HTTPException(
status_code=400, detail=f"Token refresh failed: {error_detail}"
)
token_json = token_response.json()
access_token = token_json.get("access_token")
if not access_token:
raise HTTPException(
status_code=400, detail="No access token received from Dropbox refresh"
)
expires_at = None
expires_in = token_json.get("expires_in")
if expires_in:
expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in))
cfg = dict(connector.config)
cfg["access_token"] = token_encryption.encrypt_token(access_token)
cfg["expires_in"] = expires_in
cfg["expires_at"] = expires_at.isoformat() if expires_at else None
cfg["_token_encrypted"] = True
cfg.pop("auth_expired", None)
connector.config = cfg
flag_modified(connector, "config")
await session.commit()
await session.refresh(connector)
logger.info("Successfully refreshed Dropbox tokens for connector %s", connector.id)
return connector

View file

@ -1046,6 +1046,52 @@ async def index_connector_content(
)
response_message = "OneDrive indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DROPBOX_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import (
index_dropbox_files_task,
)
if drive_items and drive_items.has_items():
logger.info(
f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id}, "
f"folders: {len(drive_items.folders)}, files: {len(drive_items.files)}"
)
items_dict = drive_items.model_dump()
else:
config = connector.config or {}
selected_folders = config.get("selected_folders", [])
selected_files = config.get("selected_files", [])
if not selected_folders and not selected_files:
raise HTTPException(
status_code=400,
detail="Dropbox indexing requires folders or files to be configured. "
"Please select folders/files to index.",
)
indexing_options = config.get(
"indexing_options",
{
"max_files_per_folder": 100,
"include_subfolders": True,
},
)
items_dict = {
"folders": selected_folders,
"files": selected_files,
"indexing_options": indexing_options,
}
logger.info(
f"Triggering Dropbox indexing for connector {connector_id} into search space {search_space_id} "
f"using existing config"
)
index_dropbox_files_task.delay(
connector_id,
search_space_id,
str(user.id),
items_dict,
)
response_message = "Dropbox indexing started in the background."
elif connector.connector_type == SearchSourceConnectorType.DISCORD_CONNECTOR:
from app.tasks.celery_tasks.connector_tasks import (
index_discord_messages_task,
@ -2644,6 +2690,114 @@ async def run_onedrive_indexing(
logger.error(f"Failed to update notification: {notif_error!s}")
async def run_dropbox_indexing(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict,
):
"""Runs the Dropbox indexing task for folders and files with notifications."""
from uuid import UUID
notification = None
try:
from app.tasks.connector_indexers.dropbox_indexer import index_dropbox_files
connector_result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.id == connector_id
)
)
connector = connector_result.scalar_one_or_none()
if connector:
notification = await NotificationService.connector_indexing.notify_google_drive_indexing_started(
session=session,
user_id=UUID(user_id),
connector_id=connector_id,
connector_name=connector.name,
connector_type=connector.connector_type.value,
search_space_id=search_space_id,
folder_count=len(items_dict.get("folders", [])),
file_count=len(items_dict.get("files", [])),
folder_names=[
f.get("name", "Unknown") for f in items_dict.get("folders", [])
],
file_names=[
f.get("name", "Unknown") for f in items_dict.get("files", [])
],
)
if notification:
await NotificationService.connector_indexing.notify_indexing_progress(
session=session,
notification=notification,
indexed_count=0,
stage="fetching",
)
total_indexed, total_skipped, error_message = await index_dropbox_files(
session,
connector_id,
search_space_id,
user_id,
items_dict,
)
if error_message:
logger.error(
f"Dropbox indexing completed with errors for connector {connector_id}: {error_message}"
)
if _is_auth_error(error_message):
await _persist_auth_expired(session, connector_id)
error_message = (
"Dropbox authentication expired. Please re-authenticate."
)
else:
if notification:
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_progress(
session=session,
notification=notification,
indexed_count=total_indexed,
stage="storing",
)
logger.info(
f"Dropbox indexing successful for connector {connector_id}. Indexed {total_indexed} documents."
)
await _update_connector_timestamp_by_id(session, connector_id)
await session.commit()
if notification:
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=total_indexed,
error_message=error_message,
skipped_count=total_skipped,
)
except Exception as e:
logger.error(
f"Critical error in run_dropbox_indexing for connector {connector_id}: {e}",
exc_info=True,
)
if notification:
try:
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed(
session=session,
notification=notification,
indexed_count=0,
error_message=str(e),
)
except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}")
# Add new helper functions for luma indexing
async def run_luma_indexing_with_new_session(
connector_id: int,