feat: implement Dropbox API client and folder management for enhanced file indexing

This commit is contained in:
Anish Sarkar 2026-03-30 22:17:50 +05:30
parent 54828522f8
commit 1f12151e03
5 changed files with 1008 additions and 0 deletions

View file

@ -0,0 +1,317 @@
"""Dropbox API client using Dropbox HTTP API v2."""
import json
import logging
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.db import SearchSourceConnector
from app.utils.oauth_security import TokenEncryption
logger = logging.getLogger(__name__)
API_BASE = "https://api.dropboxapi.com"
CONTENT_BASE = "https://content.dropboxapi.com"
TOKEN_URL = "https://api.dropboxapi.com/oauth2/token"
class DropboxClient:
"""Client for Dropbox via the HTTP API v2."""
def __init__(self, session: AsyncSession, connector_id: int):
self._session = session
self._connector_id = connector_id
async def _get_valid_token(self) -> str:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if not connector:
raise ValueError(f"Connector {self._connector_id} not found")
cfg = connector.config or {}
is_encrypted = cfg.get("_token_encrypted", False)
token_encryption = (
TokenEncryption(config.SECRET_KEY) if config.SECRET_KEY else None
)
access_token = cfg.get("access_token", "")
refresh_token = cfg.get("refresh_token")
if is_encrypted and token_encryption:
if access_token:
access_token = token_encryption.decrypt_token(access_token)
if refresh_token:
refresh_token = token_encryption.decrypt_token(refresh_token)
expires_at_str = cfg.get("expires_at")
is_expired = False
if expires_at_str:
expires_at = datetime.fromisoformat(expires_at_str)
if expires_at.tzinfo is None:
expires_at = expires_at.replace(tzinfo=UTC)
is_expired = expires_at <= datetime.now(UTC)
if not is_expired and access_token:
return access_token
if not refresh_token:
cfg["auth_expired"] = True
connector.config = cfg
flag_modified(connector, "config")
await self._session.commit()
raise ValueError("Dropbox token expired and no refresh token available")
token_data = await self._refresh_token(refresh_token)
new_access = token_data["access_token"]
expires_in = token_data.get("expires_in")
new_expires_at = None
if expires_in:
new_expires_at = datetime.now(UTC) + timedelta(seconds=int(expires_in))
if token_encryption:
cfg["access_token"] = token_encryption.encrypt_token(new_access)
else:
cfg["access_token"] = new_access
cfg["expires_at"] = new_expires_at.isoformat() if new_expires_at else None
cfg["expires_in"] = expires_in
cfg["_token_encrypted"] = bool(token_encryption)
cfg.pop("auth_expired", None)
connector.config = cfg
flag_modified(connector, "config")
await self._session.commit()
return new_access
async def _refresh_token(self, refresh_token: str) -> dict:
data = {
"client_id": config.DROPBOX_APP_KEY,
"client_secret": config.DROPBOX_APP_SECRET,
"grant_type": "refresh_token",
"refresh_token": refresh_token,
}
async with httpx.AsyncClient() as client:
resp = await client.post(
TOKEN_URL,
data=data,
headers={"Content-Type": "application/x-www-form-urlencoded"},
timeout=30.0,
)
if resp.status_code != 200:
error_detail = resp.text
try:
error_json = resp.json()
error_detail = error_json.get("error_description", error_detail)
except Exception:
pass
raise ValueError(f"Dropbox token refresh failed: {error_detail}")
return resp.json()
async def _request(
self, path: str, json_body: dict | None = None, **kwargs
) -> httpx.Response:
"""Make an authenticated RPC request to the Dropbox API."""
token = await self._get_valid_token()
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json",
}
if "headers" in kwargs:
headers.update(kwargs.pop("headers"))
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{API_BASE}{path}",
headers=headers,
json=json_body,
timeout=60.0,
**kwargs,
)
if resp.status_code == 401:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if connector:
cfg = connector.config or {}
cfg["auth_expired"] = True
connector.config = cfg
flag_modified(connector, "config")
await self._session.commit()
raise ValueError("Dropbox authentication expired (401)")
return resp
async def _content_request(
self, path: str, api_arg: dict, content: bytes | None = None, **kwargs
) -> httpx.Response:
"""Make an authenticated content-upload/download request."""
token = await self._get_valid_token()
headers = {
"Authorization": f"Bearer {token}",
"Dropbox-API-Arg": json.dumps(api_arg),
"Content-Type": "application/octet-stream",
}
if "headers" in kwargs:
headers.update(kwargs.pop("headers"))
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{CONTENT_BASE}{path}",
headers=headers,
content=content or b"",
timeout=120.0,
**kwargs,
)
if resp.status_code == 401:
result = await self._session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == self._connector_id
)
)
connector = result.scalars().first()
if connector:
cfg = connector.config or {}
cfg["auth_expired"] = True
connector.config = cfg
flag_modified(connector, "config")
await self._session.commit()
raise ValueError("Dropbox authentication expired (401)")
return resp
async def list_folder(
self, path: str = ""
) -> tuple[list[dict[str, Any]], str | None]:
"""List all items in a folder. Handles pagination via cursor."""
all_items: list[dict[str, Any]] = []
resp = await self._request(
"/2/files/list_folder",
{"path": path, "recursive": False, "include_non_downloadable_files": True},
)
if resp.status_code != 200:
return [], f"Failed to list folder: {resp.status_code} - {resp.text}"
data = resp.json()
all_items.extend(data.get("entries", []))
while data.get("has_more"):
cursor = data["cursor"]
resp = await self._request(
"/2/files/list_folder/continue", {"cursor": cursor}
)
if resp.status_code != 200:
return all_items, f"Pagination failed: {resp.status_code}"
data = resp.json()
all_items.extend(data.get("entries", []))
return all_items, None
async def get_metadata(
self, path: str
) -> tuple[dict[str, Any] | None, str | None]:
resp = await self._request("/2/files/get_metadata", {"path": path})
if resp.status_code != 200:
return None, f"Failed to get metadata: {resp.status_code} - {resp.text}"
return resp.json(), None
async def download_file(self, path: str) -> tuple[bytes | None, str | None]:
resp = await self._content_request(
"/2/files/download", {"path": path}
)
if resp.status_code != 200:
return None, f"Download failed: {resp.status_code}"
return resp.content, None
async def download_file_to_disk(self, path: str, dest_path: str) -> str | None:
"""Stream file content to disk. Returns error message on failure."""
token = await self._get_valid_token()
headers = {
"Authorization": f"Bearer {token}",
"Dropbox-API-Arg": json.dumps({"path": path}),
}
async with (
httpx.AsyncClient() as client,
client.stream(
"POST",
f"{CONTENT_BASE}/2/files/download",
headers=headers,
timeout=120.0,
) as resp,
):
if resp.status_code != 200:
return f"Download failed: {resp.status_code}"
with open(dest_path, "wb") as f:
async for chunk in resp.aiter_bytes(chunk_size=5 * 1024 * 1024):
f.write(chunk)
return None
async def upload_file(
self,
path: str,
content: bytes,
mode: str = "add",
autorename: bool = True,
) -> dict[str, Any]:
"""Upload a file to Dropbox (up to 150MB)."""
api_arg = {"path": path, "mode": mode, "autorename": autorename}
resp = await self._content_request("/2/files/upload", api_arg, content)
if resp.status_code != 200:
raise ValueError(f"Upload failed: {resp.status_code} - {resp.text}")
return resp.json()
async def create_paper_doc(
self, path: str, markdown_content: str
) -> dict[str, Any]:
"""Create a Dropbox Paper document from markdown."""
token = await self._get_valid_token()
api_arg = {"import_format": "markdown", "path": path}
headers = {
"Authorization": f"Bearer {token}",
"Dropbox-API-Arg": json.dumps(api_arg),
"Content-Type": "application/octet-stream",
}
async with httpx.AsyncClient() as client:
resp = await client.post(
f"{API_BASE}/2/files/paper/create",
headers=headers,
content=markdown_content.encode("utf-8"),
timeout=60.0,
)
if resp.status_code != 200:
raise ValueError(
f"Paper doc creation failed: {resp.status_code} - {resp.text}"
)
return resp.json()
async def delete_file(self, path: str) -> dict[str, Any]:
"""Delete a file or folder."""
resp = await self._request("/2/files/delete_v2", {"path": path})
if resp.status_code != 200:
raise ValueError(f"Delete failed: {resp.status_code} - {resp.text}")
return resp.json()
async def get_current_account(self) -> tuple[dict[str, Any] | None, str | None]:
"""Get current user's account info."""
resp = await self._request("/2/users/get_current_account", None)
if resp.status_code != 200:
return None, f"Failed to get account: {resp.status_code}"
return resp.json(), None

View file

@ -0,0 +1,48 @@
"""File type handlers for Dropbox."""
SKIP_EXTENSIONS = frozenset(
{
".paper", # Dropbox Paper docs are not downloadable via /files/download
}
)
MIME_TO_EXTENSION: dict[str, str] = {
"application/pdf": ".pdf",
"application/vnd.openxmlformats-officedocument.spreadsheetml.sheet": ".xlsx",
"application/vnd.openxmlformats-officedocument.wordprocessingml.document": ".docx",
"application/vnd.openxmlformats-officedocument.presentationml.presentation": ".pptx",
"application/vnd.ms-excel": ".xls",
"application/msword": ".doc",
"application/vnd.ms-powerpoint": ".ppt",
"text/plain": ".txt",
"text/csv": ".csv",
"text/html": ".html",
"text/markdown": ".md",
"application/json": ".json",
"application/xml": ".xml",
"image/png": ".png",
"image/jpeg": ".jpg",
}
def get_extension_from_name(name: str) -> str:
"""Extract extension from filename."""
dot = name.rfind(".")
if dot > 0:
return name[dot:]
return ""
def is_folder(item: dict) -> bool:
return item.get(".tag") == "folder"
def should_skip_file(item: dict) -> bool:
"""Skip folders and non-downloadable files."""
if is_folder(item):
return True
if not item.get("is_downloadable", True):
return True
name = item.get("name", "")
ext = get_extension_from_name(name).lower()
return ext in SKIP_EXTENSIONS

View file

@ -0,0 +1,92 @@
"""Folder management for Dropbox."""
import logging
from typing import Any
from .client import DropboxClient
from .file_types import is_folder, should_skip_file
logger = logging.getLogger(__name__)
async def list_folder_contents(
client: DropboxClient,
path: str = "",
) -> tuple[list[dict[str, Any]], str | None]:
"""List folders and files in a Dropbox folder.
Returns (items list with folders first, error message).
"""
try:
items, error = await client.list_folder(path)
if error:
return [], error
for item in items:
item["isFolder"] = is_folder(item)
items.sort(key=lambda x: (not x["isFolder"], x.get("name", "").lower()))
folder_count = sum(1 for item in items if item["isFolder"])
file_count = len(items) - folder_count
logger.info(
f"Listed {len(items)} items ({folder_count} folders, {file_count} files) "
+ (f"in folder {path}" if path else "in root")
)
return items, None
except Exception as e:
logger.error(f"Error listing folder contents: {e!s}", exc_info=True)
return [], f"Error listing folder contents: {e!s}"
async def get_files_in_folder(
client: DropboxClient,
path: str,
include_subfolders: bool = True,
) -> tuple[list[dict[str, Any]], str | None]:
"""Get all indexable files in a folder, optionally recursing into subfolders."""
try:
items, error = await client.list_folder(path)
if error:
return [], error
files: list[dict[str, Any]] = []
for item in items:
if is_folder(item):
if include_subfolders:
sub_files, sub_error = await get_files_in_folder(
client, item.get("path_lower", ""), include_subfolders=True
)
if sub_error:
logger.warning(
f"Error recursing into folder {item.get('name')}: {sub_error}"
)
continue
files.extend(sub_files)
elif not should_skip_file(item):
files.append(item)
return files, None
except Exception as e:
logger.error(f"Error getting files in folder: {e!s}", exc_info=True)
return [], f"Error getting files in folder: {e!s}"
async def get_file_by_path(
client: DropboxClient,
path: str,
) -> tuple[dict[str, Any] | None, str | None]:
"""Get file metadata by path."""
try:
item, error = await client.get_metadata(path)
if error:
return None, error
if not item:
return None, f"File not found: {path}"
return item, None
except Exception as e:
logger.error(f"Error getting file by path: {e!s}", exc_info=True)
return None, f"Error getting file by path: {e!s}"

View file

@ -574,6 +574,54 @@ async def _index_onedrive_files(
)
@celery_app.task(name="index_dropbox_files", bind=True)
def index_dropbox_files_task(
self,
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict,
):
"""Celery task to index Dropbox folders and files."""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(
_index_dropbox_files(
connector_id,
search_space_id,
user_id,
items_dict,
)
)
finally:
loop.close()
async def _index_dropbox_files(
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict,
):
"""Index Dropbox folders and files with new session."""
from app.routes.search_source_connectors_routes import (
run_dropbox_indexing,
)
async with get_celery_session_maker()() as session:
await run_dropbox_indexing(
session,
connector_id,
search_space_id,
user_id,
items_dict,
)
@celery_app.task(name="index_discord_messages", bind=True)
def index_discord_messages_task(
self,

View file

@ -0,0 +1,503 @@
"""Dropbox indexer using the shared IndexingPipelineService.
File-level pre-filter (_should_skip_file) handles content_hash and
server_modified checks. download_and_extract_content() returns
markdown which is fed into ConnectorDocument -> pipeline.
"""
import asyncio
import logging
import time
from collections.abc import Awaitable, Callable
from sqlalchemy import String, cast, select
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm.attributes import flag_modified
from app.config import config
from app.connectors.dropbox import (
DropboxClient,
download_and_extract_content,
get_file_by_path,
get_files_in_folder,
)
from app.connectors.dropbox.file_types import should_skip_file as skip_item
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
update_connector_last_indexed,
)
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
HEARTBEAT_INTERVAL_SECONDS = 30
logger = logging.getLogger(__name__)
async def _should_skip_file(
session: AsyncSession,
file: dict,
search_space_id: int,
) -> tuple[bool, str | None]:
"""Pre-filter: detect unchanged / rename-only files."""
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
if skip_item(file):
return True, "folder/non-downloadable"
if not file_id:
return True, "missing file_id"
primary_hash = compute_identifier_hash(
DocumentType.DROPBOX_FILE.value, file_id, search_space_id
)
existing = await check_document_by_unique_identifier(session, primary_hash)
if not existing:
result = await session.execute(
select(Document).where(
Document.search_space_id == search_space_id,
Document.document_type == DocumentType.DROPBOX_FILE,
cast(Document.document_metadata["dropbox_file_id"], String) == file_id,
)
)
existing = result.scalar_one_or_none()
if existing:
existing.unique_identifier_hash = primary_hash
logger.debug(f"Found Dropbox doc by metadata for file_id: {file_id}")
if not existing:
return False, None
incoming_content_hash = file.get("content_hash")
meta = existing.document_metadata or {}
stored_content_hash = meta.get("content_hash")
incoming_mtime = file.get("server_modified")
stored_mtime = meta.get("modified_time")
content_unchanged = False
if incoming_content_hash and stored_content_hash:
content_unchanged = incoming_content_hash == stored_content_hash
elif incoming_content_hash and not stored_content_hash:
return False, None
elif not incoming_content_hash and incoming_mtime and stored_mtime:
content_unchanged = incoming_mtime == stored_mtime
elif not incoming_content_hash:
return False, None
if not content_unchanged:
return False, None
old_name = meta.get("dropbox_file_name")
if old_name and old_name != file_name:
existing.title = file_name
if not existing.document_metadata:
existing.document_metadata = {}
existing.document_metadata["dropbox_file_name"] = file_name
if incoming_mtime:
existing.document_metadata["modified_time"] = incoming_mtime
flag_modified(existing, "document_metadata")
await session.commit()
logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'")
return True, f"File renamed: '{old_name}' -> '{file_name}'"
if not DocumentStatus.is_state(existing.status, DocumentStatus.READY):
return True, "skipped (previously failed)"
return True, "unchanged"
def _build_connector_doc(
file: dict,
markdown: str,
dropbox_metadata: dict,
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
metadata = {
**dropbox_metadata,
"connector_id": connector_id,
"document_type": "Dropbox File",
"connector_type": "Dropbox",
}
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
return ConnectorDocument(
title=file_name,
source_markdown=markdown,
unique_id=file_id,
document_type=DocumentType.DROPBOX_FILE,
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
async def _download_files_parallel(
dropbox_client: DropboxClient,
files: list[dict],
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
last_heartbeat = time.time()
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
nonlocal last_heartbeat, completed_count
async with sem:
markdown, db_metadata, error = await download_and_extract_content(
dropbox_client, file
)
if error or not markdown:
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
doc = _build_connector_doc(
file,
markdown,
db_metadata,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
async with hb_lock:
completed_count += 1
if on_heartbeat:
now = time.time()
if now - last_heartbeat >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat(completed_count)
last_heartbeat = now
return doc
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
results.append(outcome)
return results, failed
async def _download_and_index(
dropbox_client: DropboxClient,
session: AsyncSession,
files: list[dict],
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
dropbox_client,
files,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
pipeline = IndexingPipelineService(session)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
async def _index_full_scan(
dropbox_client: DropboxClient,
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
folder_path: str,
folder_name: str,
task_logger: TaskLoggingService,
log_entry: object,
max_files: int,
include_subfolders: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
) -> tuple[int, int]:
"""Full scan indexing of a folder."""
await task_logger.log_task_progress(
log_entry,
f"Starting full scan of folder: {folder_name}",
{
"stage": "full_scan",
"folder_path": folder_path,
"include_subfolders": include_subfolders,
},
)
renamed_count = 0
skipped = 0
files_to_download: list[dict] = []
all_files, error = await get_files_in_folder(
dropbox_client,
folder_path,
include_subfolders=include_subfolders,
)
if error:
err_lower = error.lower()
if "401" in error or "authentication expired" in err_lower:
raise Exception(
f"Dropbox authentication failed. Please re-authenticate. (Error: {error})"
)
raise Exception(f"Failed to list Dropbox files: {error}")
for file in all_files[:max_files]:
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
files_to_download.append(file)
batch_indexed, failed = await _download_and_index(
dropbox_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
)
indexed = renamed_count + batch_indexed
logger.info(
f"Full scan complete: {indexed} indexed, {skipped} skipped, {failed} failed"
)
return indexed, skipped
async def _index_selected_files(
dropbox_client: DropboxClient,
session: AsyncSession,
file_paths: list[tuple[str, str | None]],
*,
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
) -> tuple[int, int, list[str]]:
"""Index user-selected files using the parallel pipeline."""
files_to_download: list[dict] = []
errors: list[str] = []
renamed_count = 0
skipped = 0
for file_path, file_name in file_paths:
file, error = await get_file_by_path(dropbox_client, file_path)
if error or not file:
display = file_name or file_path
errors.append(f"File '{display}': {error or 'File not found'}")
continue
skip, msg = await _should_skip_file(session, file, search_space_id)
if skip:
if msg and "renamed" in msg.lower():
renamed_count += 1
else:
skipped += 1
continue
files_to_download.append(file)
batch_indexed, _failed = await _download_and_index(
dropbox_client,
session,
files_to_download,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
)
return renamed_count + batch_indexed, skipped, errors
async def index_dropbox_files(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
items_dict: dict,
) -> tuple[int, int, str | None]:
"""Index Dropbox files for a specific connector.
items_dict format:
{
"folders": [{"path": "...", "name": "..."}, ...],
"files": [{"path": "...", "name": "..."}, ...],
"indexing_options": {"max_files": 500, "include_subfolders": true}
}
"""
task_logger = TaskLoggingService(session, search_space_id)
log_entry = await task_logger.log_task_start(
task_name="dropbox_files_indexing",
source="connector_indexing_task",
message=f"Starting Dropbox indexing for connector {connector_id}",
metadata={"connector_id": connector_id, "user_id": str(user_id)},
)
try:
connector = await get_connector_by_id(
session, connector_id, SearchSourceConnectorType.DROPBOX_CONNECTOR
)
if not connector:
error_msg = f"Dropbox connector with ID {connector_id} not found"
await task_logger.log_task_failure(
log_entry, error_msg, None, {"error_type": "ConnectorNotFound"}
)
return 0, 0, error_msg
token_encrypted = connector.config.get("_token_encrypted", False)
if token_encrypted and not config.SECRET_KEY:
error_msg = "SECRET_KEY not configured but credentials are encrypted"
await task_logger.log_task_failure(
log_entry,
error_msg,
"Missing SECRET_KEY",
{"error_type": "MissingSecretKey"},
)
return 0, 0, error_msg
connector_enable_summary = getattr(connector, "enable_summary", True)
dropbox_client = DropboxClient(session, connector_id)
indexing_options = items_dict.get("indexing_options", {})
max_files = indexing_options.get("max_files", 500)
include_subfolders = indexing_options.get("include_subfolders", True)
total_indexed = 0
total_skipped = 0
selected_files = items_dict.get("files", [])
if selected_files:
file_tuples = [
(f.get("path", f.get("path_lower", "")), f.get("name"))
for f in selected_files
]
indexed, skipped, _errors = await _index_selected_files(
dropbox_client,
session,
file_tuples,
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
total_skipped += skipped
folders = items_dict.get("folders", [])
for folder in folders:
folder_path = folder.get("path", folder.get("path_lower", ""))
folder_name = folder.get("name", "Root")
logger.info(f"Using full scan for folder {folder_name}")
indexed, skipped = await _index_full_scan(
dropbox_client,
session,
connector_id,
search_space_id,
user_id,
folder_path,
folder_name,
task_logger,
log_entry,
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
)
total_indexed += indexed
total_skipped += skipped
if total_indexed > 0 or folders:
await update_connector_last_indexed(session, connector, True)
await session.commit()
await task_logger.log_task_success(
log_entry,
f"Successfully completed Dropbox indexing for connector {connector_id}",
{"files_processed": total_indexed, "files_skipped": total_skipped},
)
logger.info(
f"Dropbox indexing completed: {total_indexed} indexed, {total_skipped} skipped"
)
return total_indexed, total_skipped, None
except SQLAlchemyError as db_error:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Database error during Dropbox indexing for connector {connector_id}",
str(db_error),
{"error_type": "SQLAlchemyError"},
)
logger.error(f"Database error: {db_error!s}", exc_info=True)
return 0, 0, f"Database error: {db_error!s}"
except Exception as e:
await session.rollback()
await task_logger.log_task_failure(
log_entry,
f"Failed to index Dropbox files for connector {connector_id}",
str(e),
{"error_type": type(e).__name__},
)
logger.error(f"Failed to index Dropbox files: {e!s}", exc_info=True)
return 0, 0, f"Failed to index Dropbox files: {e!s}"