fix(dropbox): mark documents failed on ETL failure

This commit is contained in:
CREDO23 2026-06-09 23:39:25 +02:00
parent 82aaaa5a9f
commit e45e8389dc

View file

@ -32,6 +32,7 @@ from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
mark_connector_documents_failed,
update_connector_last_indexed,
)
@ -158,15 +159,20 @@ async def _download_files_parallel(
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
"""Download and ETL files in parallel.
Returns (docs, failed_files), where failed_files is a list of
(file_id, reason) so callers can mark those placeholders failed.
"""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
last_heartbeat = time.time()
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
async def _download_one(file: dict) -> ConnectorDocument | str:
# ConnectorDocument on success; failure reason string otherwise.
nonlocal last_heartbeat, completed_count
async with sem:
markdown, db_metadata, error = await download_and_extract_content(
@ -176,7 +182,7 @@ async def _download_files_parallel(
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
return f"Download/ETL failed: {reason}"
doc = _build_connector_doc(
file,
markdown,
@ -197,14 +203,28 @@ async def _download_files_parallel(
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
failed_files: list[tuple[str, str]] = []
for file, outcome in zip(files, outcomes, strict=False):
if isinstance(outcome, ConnectorDocument):
results.append(outcome)
continue
file_id = file.get("id")
if isinstance(outcome, Exception):
reason = f"Download/ETL error: {outcome}"
logger.warning(
"Download/ETL exception for %s: %s",
file.get("name", "Unknown"),
outcome,
exc_info=outcome,
)
elif isinstance(outcome, str):
reason = outcome
else:
reason = "Download or extraction failed"
if file_id:
failed_files.append((file_id, reason))
return results, failed
return results, failed_files
async def _download_and_index(
@ -219,7 +239,7 @@ async def _download_and_index(
vision_llm=None,
) -> tuple[int, int]:
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
connector_docs, download_failed = await _download_files_parallel(
connector_docs, failed_files = await _download_files_parallel(
dropbox_client,
files,
connector_id=connector_id,
@ -229,6 +249,15 @@ async def _download_and_index(
vision_llm=vision_llm,
)
# Fail rows for files whose download/ETL failed, so they don't stay stuck.
if failed_files:
await mark_connector_documents_failed(
session,
document_type=DocumentType.DROPBOX_FILE,
search_space_id=search_space_id,
failures=failed_files,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
@ -239,7 +268,7 @@ async def _download_and_index(
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
return batch_indexed, len(failed_files) + batch_failed
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):