From e45e8389dc95300c70ac09618b3887d82e186a6f Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH] fix(dropbox): mark documents failed on ETL failure --- .../connector_indexers/dropbox_indexer.py | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 6e61bce18..76fa34159 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -32,6 +32,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -158,15 +159,20 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel. Returns (docs, failed_count).""" +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. + + Returns (docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. + """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) last_heartbeat = time.time() completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, db_metadata, error = await download_and_extract_content( @@ -176,7 +182,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -197,14 +203,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {outcome}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _download_and_index( @@ -219,7 +239,7 @@ async def _download_and_index( vision_llm=None, ) -> tuple[int, int]: """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( dropbox_client, files, connector_id=connector_id, @@ -229,6 +249,15 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail rows for files whose download/ETL failed, so they don't stay stuck. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.DROPBOX_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -239,7 +268,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):