fix(google-drive): mark placeholders failed on ETL failure

This commit is contained in:
CREDO23 2026-06-09 23:39:25 +02:00
parent cb10882dc8
commit 6fd95f82b4

View file

@ -45,6 +45,7 @@ from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
check_document_by_unique_identifier,
get_connector_by_id,
mark_connector_documents_failed,
update_connector_last_indexed,
)
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
@ -458,10 +459,11 @@ async def _download_files_parallel(
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[list[ConnectorDocument], int]:
"""Download and ETL files in parallel, returning ConnectorDocuments.
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
"""Download and ETL files in parallel.
Returns (connector_docs, download_failed_count).
Returns (connector_docs, failed_files), where failed_files is a list of
(file_id, reason) so callers can mark those placeholders failed.
"""
results: list[ConnectorDocument] = []
sem = asyncio.Semaphore(max_concurrency)
@ -469,7 +471,8 @@ async def _download_files_parallel(
completed_count = 0
hb_lock = asyncio.Lock()
async def _download_one(file: dict) -> ConnectorDocument | None:
async def _download_one(file: dict) -> ConnectorDocument | str:
# ConnectorDocument on success; failure reason string otherwise.
nonlocal last_heartbeat, completed_count
async with sem:
markdown, drive_metadata, error = await download_and_extract_content(
@ -479,7 +482,7 @@ async def _download_files_parallel(
file_name = file.get("name", "Unknown")
reason = error or "empty content"
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
return None
return f"Download/ETL failed: {reason}"
doc = _build_connector_doc(
file,
markdown,
@ -500,14 +503,28 @@ async def _download_files_parallel(
tasks = [_download_one(f) for f in files]
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
failed = 0
for outcome in outcomes:
if isinstance(outcome, Exception) or outcome is None:
failed += 1
else:
failed_files: list[tuple[str, str]] = []
for file, outcome in zip(files, outcomes, strict=False):
if isinstance(outcome, ConnectorDocument):
results.append(outcome)
continue
file_id = file.get("id")
if isinstance(outcome, Exception):
reason = f"Download/ETL error: {outcome}"
logger.warning(
"Download/ETL exception for %s: %s",
file.get("name", "Unknown"),
outcome,
exc_info=outcome,
)
elif isinstance(outcome, str):
reason = outcome
else:
reason = "Download or extraction failed"
if file_id:
failed_files.append((file_id, reason))
return results, failed
return results, failed_files
async def _process_single_file(
@ -542,7 +559,16 @@ async def _process_single_file(
drive_client, file, vision_llm=vision_llm
)
if error or not markdown:
logger.warning(f"ETL failed for {file_name}: {error}")
reason = error or "empty content"
logger.warning(f"ETL failed for {file_name}: {reason}")
file_id = file.get("id")
if file_id:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=search_space_id,
failures=[(file_id, f"Download/ETL failed: {reason}")],
)
return 0, 1, 0
doc = _build_connector_doc(
@ -630,7 +656,7 @@ async def _download_and_index(
Returns (batch_indexed, total_failed).
"""
connector_docs, download_failed = await _download_files_parallel(
connector_docs, failed_files = await _download_files_parallel(
drive_client,
files,
connector_id=connector_id,
@ -640,6 +666,16 @@ async def _download_and_index(
vision_llm=vision_llm,
)
# Fail the placeholders for files whose download/ETL failed, so they don't
# stay stuck in 'pending'.
if failed_files:
await mark_connector_documents_failed(
session,
document_type=DocumentType.GOOGLE_DRIVE_FILE,
search_space_id=search_space_id,
failures=failed_files,
)
batch_indexed = 0
batch_failed = 0
if connector_docs:
@ -650,7 +686,7 @@ async def _download_and_index(
on_heartbeat=on_heartbeat,
)
return batch_indexed, download_failed + batch_failed
return batch_indexed, len(failed_files) + batch_failed
async def _index_selected_files(