mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-10 20:35:17 +02:00
fix(onedrive): mark documents failed on ETL failure
This commit is contained in:
parent
6fd95f82b4
commit
82aaaa5a9f
1 changed files with 41 additions and 12 deletions
|
|
@ -32,6 +32,7 @@ from app.services.task_logging_service import TaskLoggingService
|
|||
from app.tasks.connector_indexers.base import (
|
||||
check_document_by_unique_identifier,
|
||||
get_connector_by_id,
|
||||
mark_connector_documents_failed,
|
||||
update_connector_last_indexed,
|
||||
)
|
||||
|
||||
|
|
@ -165,15 +166,20 @@ async def _download_files_parallel(
|
|||
max_concurrency: int = 3,
|
||||
on_heartbeat: HeartbeatCallbackType | None = None,
|
||||
vision_llm=None,
|
||||
) -> tuple[list[ConnectorDocument], int]:
|
||||
"""Download and ETL files in parallel. Returns (docs, failed_count)."""
|
||||
) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]:
|
||||
"""Download and ETL files in parallel.
|
||||
|
||||
Returns (docs, failed_files), where failed_files is a list of
|
||||
(file_id, reason) so callers can mark those placeholders failed.
|
||||
"""
|
||||
results: list[ConnectorDocument] = []
|
||||
sem = asyncio.Semaphore(max_concurrency)
|
||||
last_heartbeat = time.time()
|
||||
completed_count = 0
|
||||
hb_lock = asyncio.Lock()
|
||||
|
||||
async def _download_one(file: dict) -> ConnectorDocument | None:
|
||||
async def _download_one(file: dict) -> ConnectorDocument | str:
|
||||
# ConnectorDocument on success; failure reason string otherwise.
|
||||
nonlocal last_heartbeat, completed_count
|
||||
async with sem:
|
||||
markdown, od_metadata, error = await download_and_extract_content(
|
||||
|
|
@ -183,7 +189,7 @@ async def _download_files_parallel(
|
|||
file_name = file.get("name", "Unknown")
|
||||
reason = error or "empty content"
|
||||
logger.warning(f"Download/ETL failed for {file_name}: {reason}")
|
||||
return None
|
||||
return f"Download/ETL failed: {reason}"
|
||||
doc = _build_connector_doc(
|
||||
file,
|
||||
markdown,
|
||||
|
|
@ -204,14 +210,28 @@ async def _download_files_parallel(
|
|||
tasks = [_download_one(f) for f in files]
|
||||
outcomes = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
failed = 0
|
||||
for outcome in outcomes:
|
||||
if isinstance(outcome, Exception) or outcome is None:
|
||||
failed += 1
|
||||
else:
|
||||
failed_files: list[tuple[str, str]] = []
|
||||
for file, outcome in zip(files, outcomes, strict=False):
|
||||
if isinstance(outcome, ConnectorDocument):
|
||||
results.append(outcome)
|
||||
continue
|
||||
file_id = file.get("id")
|
||||
if isinstance(outcome, Exception):
|
||||
reason = f"Download/ETL error: {outcome}"
|
||||
logger.warning(
|
||||
"Download/ETL exception for %s: %s",
|
||||
file.get("name", "Unknown"),
|
||||
outcome,
|
||||
exc_info=outcome,
|
||||
)
|
||||
elif isinstance(outcome, str):
|
||||
reason = outcome
|
||||
else:
|
||||
reason = "Download or extraction failed"
|
||||
if file_id:
|
||||
failed_files.append((file_id, reason))
|
||||
|
||||
return results, failed
|
||||
return results, failed_files
|
||||
|
||||
|
||||
async def _download_and_index(
|
||||
|
|
@ -226,7 +246,7 @@ async def _download_and_index(
|
|||
vision_llm=None,
|
||||
) -> tuple[int, int]:
|
||||
"""Parallel download then parallel indexing. Returns (batch_indexed, total_failed)."""
|
||||
connector_docs, download_failed = await _download_files_parallel(
|
||||
connector_docs, failed_files = await _download_files_parallel(
|
||||
onedrive_client,
|
||||
files,
|
||||
connector_id=connector_id,
|
||||
|
|
@ -236,6 +256,15 @@ async def _download_and_index(
|
|||
vision_llm=vision_llm,
|
||||
)
|
||||
|
||||
# Fail rows for files whose download/ETL failed, so they don't stay stuck.
|
||||
if failed_files:
|
||||
await mark_connector_documents_failed(
|
||||
session,
|
||||
document_type=DocumentType.ONEDRIVE_FILE,
|
||||
search_space_id=search_space_id,
|
||||
failures=failed_files,
|
||||
)
|
||||
|
||||
batch_indexed = 0
|
||||
batch_failed = 0
|
||||
if connector_docs:
|
||||
|
|
@ -246,7 +275,7 @@ async def _download_and_index(
|
|||
on_heartbeat=on_heartbeat,
|
||||
)
|
||||
|
||||
return batch_indexed, download_failed + batch_failed
|
||||
return batch_indexed, len(failed_files) + batch_failed
|
||||
|
||||
|
||||
async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int):
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue