From 6fd95f82b42e356efb869d27cfbea559ff5dc09b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH] fix(google-drive): mark placeholders failed on ETL failure --- .../google_drive_indexer.py | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 68c43716b..29550c215 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -45,6 +45,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES @@ -458,10 +459,11 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel, returning ConnectorDocuments. +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. - Returns (connector_docs, download_failed_count). + Returns (connector_docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) @@ -469,7 +471,8 @@ async def _download_files_parallel( completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, drive_metadata, error = await download_and_extract_content( @@ -479,7 +482,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -500,14 +503,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {outcome}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _process_single_file( @@ -542,7 +559,16 @@ async def _process_single_file( drive_client, file, vision_llm=vision_llm ) if error or not markdown: - logger.warning(f"ETL failed for {file_name}: {error}") + reason = error or "empty content" + logger.warning(f"ETL failed for {file_name}: {reason}") + file_id = file.get("id") + if file_id: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=[(file_id, f"Download/ETL failed: {reason}")], + ) return 0, 1, 0 doc = _build_connector_doc( @@ -630,7 +656,7 @@ async def _download_and_index( Returns (batch_indexed, total_failed). """ - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( drive_client, files, connector_id=connector_id, @@ -640,6 +666,16 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail the placeholders for files whose download/ETL failed, so they don't + # stay stuck in 'pending'. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -650,7 +686,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _index_selected_files(