diff --git a/surfsense_backend/app/indexing_pipeline/document_persistence.py b/surfsense_backend/app/indexing_pipeline/document_persistence.py index d7810e516..9fd8867e2 100644 --- a/surfsense_backend/app/indexing_pipeline/document_persistence.py +++ b/surfsense_backend/app/indexing_pipeline/document_persistence.py @@ -1,3 +1,5 @@ +import contextlib +import logging from datetime import UTC, datetime from sqlalchemy.ext.asyncio import AsyncSession @@ -6,6 +8,8 @@ from sqlalchemy.orm.attributes import set_committed_value from app.db import Document, DocumentStatus +logger = logging.getLogger(__name__) + async def rollback_and_persist_failure( session: AsyncSession, document: Document, message: str @@ -18,14 +22,28 @@ async def rollback_and_persist_failure( try: await session.rollback() except Exception: - return # Session is completely dead; nothing further we can do. + # Session is completely dead; surface it but never raise. + logger.warning( + "Rollback failed; cannot persist failed status for document %s", + getattr(document, "id", "unknown"), + exc_info=True, + ) + return try: await session.refresh(document) document.updated_at = datetime.now(UTC) document.status = DocumentStatus.failed(message) await session.commit() except Exception: - pass # Best-effort; document will be retried on the next sync. + # Best-effort: the document stays non-ready and is retried next sync. + # Log it so a permanently-stuck document is at least traceable. + logger.warning( + "Could not persist failed status for document %s; will retry next sync", + getattr(document, "id", "unknown"), + exc_info=True, + ) + with contextlib.suppress(Exception): + await session.rollback() def attach_chunks_to_document(document: Document, chunks: list) -> None: diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index ac38b7bf7..e2a1b109a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -6,6 +6,7 @@ Implements real-time document status updates using a two-phase approach: - Phase 2: Process each document one by one (pending → processing → ready/failed) """ +import contextlib import time from collections.abc import Awaitable, Callable @@ -432,10 +433,15 @@ async def index_airtable_records( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index ffc8ab72e..9408874ca 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -2,6 +2,7 @@ Base functionality and shared imports for connector indexers. """ +import contextlib import logging from datetime import UTC, datetime, timedelta @@ -10,6 +11,8 @@ from sqlalchemy.future import select from app.db import ( Document, + DocumentStatus, + DocumentType, SearchSourceConnector, SearchSourceConnectorType, ) @@ -130,6 +133,59 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def mark_connector_documents_failed( + session: AsyncSession, + *, + document_type: DocumentType, + search_space_id: int, + failures: list[tuple[str, str]], +) -> int: + """Transition placeholder/in-progress documents to ``failed`` by source id. + + Without this, a document whose download/ETL fails stays stuck in + ``pending``/``processing`` forever: undeletable in the UI and never retried. + + ``failures`` is a list of ``(unique_id, reason)``. Best-effort: never raises, + and leaves ``ready`` documents untouched. Returns the number marked failed. + """ + if not failures: + return 0 + + from app.indexing_pipeline.document_hashing import compute_identifier_hash + + marked = 0 + try: + for unique_id, reason in failures: + if not unique_id: + continue + uid_hash = compute_identifier_hash( + document_type.value, unique_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, uid_hash) + if existing is None: + continue + if DocumentStatus.is_state(existing.status, DocumentStatus.READY): + continue + existing.status = DocumentStatus.failed(reason) + existing.updated_at = datetime.now(UTC) + marked += 1 + + if marked: + await session.commit() + except Exception: + with contextlib.suppress(Exception): + await session.rollback() + logger.warning( + "Failed to mark %d connector document(s) as failed (type=%s)", + len(failures), + getattr(document_type, "value", document_type), + exc_info=True, + ) + return 0 + + return marked + + async def get_connector_by_id( session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType ) -> SearchSourceConnector | None: diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index 74234a3b9..6471ffb00 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each page: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -432,10 +433,15 @@ async def index_bookstack_pages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() skipped_pages.append( f"{item.get('page_name', 'Unknown')} (processing error)" ) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 7b40a4b22..91763129f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -437,10 +437,15 @@ async def index_clickup_tasks( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 1187edd98..53c438197 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -21,6 +21,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -295,6 +296,23 @@ async def index_confluence_pages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.CONFLUENCE_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + await update_connector_last_indexed(session, connector, update_last_indexed) logger.info( diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 180f21412..8c5bd8f0e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime, timedelta @@ -713,10 +714,15 @@ async def index_discord_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 6e61bce18..7cd3e1613 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -26,12 +26,14 @@ from app.connectors.dropbox.file_types import should_skip_file as skip_item from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -112,7 +114,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'") return True, f"File renamed: '{old_name}' -> '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -158,15 +165,20 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel. Returns (docs, failed_count).""" +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. + + Returns (docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. + """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) last_heartbeat = time.time() completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, db_metadata, error = await download_and_extract_content( @@ -176,7 +188,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -197,14 +209,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {safe_exception_message(outcome)}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _download_and_index( @@ -219,7 +245,7 @@ async def _download_and_index( vision_llm=None, ) -> tuple[int, int]: """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( dropbox_client, files, connector_id=connector_id, @@ -229,6 +255,15 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail rows for files whose download/ETL failed, so they don't stay stuck. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.DROPBOX_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -239,7 +274,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 3283b41eb..ba0aa3445 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import json import logging import time @@ -406,10 +407,15 @@ async def index_elasticsearch_documents( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 1d0b004d8..ce9b80e5e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -9,6 +9,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -413,10 +414,15 @@ async def index_github_repos( try: document.status = DocumentStatus.failed(str(repo_err)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() errors.append(f"Failed processing {repo_full_name}: {repo_err}") documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 97f01d68b..51df39171 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -28,6 +28,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, parse_date_flexible, update_connector_last_indexed, ) @@ -448,6 +449,23 @@ async def index_google_calendar_events( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 68c43716b..b76f84bac 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -35,6 +35,7 @@ from app.connectors.google_drive.file_types import ( from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import ( IndexingPipelineService, PlaceholderInfo, @@ -45,6 +46,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES @@ -367,7 +369,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' → '{file_name}'") return True, f"File renamed: '{old_name}' → '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -458,10 +465,11 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel, returning ConnectorDocuments. +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. - Returns (connector_docs, download_failed_count). + Returns (connector_docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) @@ -469,7 +477,8 @@ async def _download_files_parallel( completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, drive_metadata, error = await download_and_extract_content( @@ -479,7 +488,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -500,14 +509,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {safe_exception_message(outcome)}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _process_single_file( @@ -542,7 +565,16 @@ async def _process_single_file( drive_client, file, vision_llm=vision_llm ) if error or not markdown: - logger.warning(f"ETL failed for {file_name}: {error}") + reason = error or "empty content" + logger.warning(f"ETL failed for {file_name}: {reason}") + file_id = file.get("id") + if file_id: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=[(file_id, f"Download/ETL failed: {reason}")], + ) return 0, 1, 0 doc = _build_connector_doc( @@ -630,7 +662,7 @@ async def _download_and_index( Returns (batch_indexed, total_failed). """ - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( drive_client, files, connector_id=connector_id, @@ -640,6 +672,16 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail the placeholders for files whose download/ETL failed, so they don't + # stay stuck in 'pending'. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -650,7 +692,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _index_selected_files( diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 225e3618e..25da96b61 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -29,6 +29,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -479,6 +480,23 @@ async def index_google_gmail_messages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 12749b82b..2bde77f79 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -25,6 +25,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -303,6 +304,23 @@ async def index_linear_issues( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.LINEAR_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 9bcba5a37..eab2c9793 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -7,6 +7,7 @@ Implements 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime, timedelta @@ -485,10 +486,15 @@ async def index_luma_events( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() skipped_events.append( f"{item.get('event_name', 'Unknown')} (processing error)" ) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 1ca9ca4ba..9ebafbcdb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -27,6 +27,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -343,6 +344,23 @@ async def index_notion_pages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.NOTION_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) diff --git a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py index 5d783e497..3fd8a79f2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py @@ -26,12 +26,14 @@ from app.connectors.onedrive.file_types import should_skip_file as skip_item from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -119,7 +121,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'") return True, f"File renamed: '{old_name}' -> '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -165,15 +172,20 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel. Returns (docs, failed_count).""" +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. + + Returns (docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. + """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) last_heartbeat = time.time() completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, od_metadata, error = await download_and_extract_content( @@ -183,7 +195,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -204,14 +216,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {safe_exception_message(outcome)}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _download_and_index( @@ -226,7 +252,7 @@ async def _download_and_index( vision_llm=None, ) -> tuple[int, int]: """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( onedrive_client, files, connector_id=connector_id, @@ -236,6 +262,15 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail rows for files whose download/ETL failed, so they don't stay stuck. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.ONEDRIVE_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -246,7 +281,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 2c6d0e11e..ac63af38c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -9,6 +9,7 @@ Uses 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -586,10 +587,15 @@ async def index_slack_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 25994895a..e48aedaa5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -630,11 +631,16 @@ async def index_teams_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( "Failed to update document status to failed: %s", str(status_error), ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py index c7565f4ba..6e85421ea 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py @@ -177,3 +177,75 @@ async def test_should_skip_file_skips_failed_document( assert should_skip, "FAILED documents must be skipped during automatic sync" assert "failed" in msg.lower() + + +@pytest.mark.parametrize("stuck_state", ["pending", "processing"]) +async def test_should_skip_file_retries_stuck_document( + db_session, + db_search_space, + db_user, + stuck_state, +): + """A doc stuck in pending/processing (worker died mid-index) must re-index, not skip.""" + import importlib + import sys + import types + + pkg = "app.tasks.connector_indexers" + stub = pkg not in sys.modules + if stub: + mod = types.ModuleType(pkg) + mod.__path__ = ["app/tasks/connector_indexers"] + mod.__package__ = pkg + sys.modules[pkg] = mod + + try: + gdm = importlib.import_module( + "app.tasks.connector_indexers.google_drive_indexer" + ) + _should_skip_file = gdm._should_skip_file + finally: + if stub: + sys.modules.pop(pkg, None) + + space_id = db_search_space.id + file_id = f"file-{stuck_state}-drive" + md5 = "stuck123checksum" + + doc_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, space_id + ) + status = ( + DocumentStatus.pending() + if stuck_state == "pending" + else DocumentStatus.processing() + ) + stuck_doc = Document( + title="Stuck File.pdf", + document_type=DocumentType.GOOGLE_DRIVE_FILE, + content="Pending...", + content_hash=f"ch-{doc_hash[:12]}", + unique_identifier_hash=doc_hash, + source_markdown="", + search_space_id=space_id, + created_by_id=str(db_user.id), + status=status, + document_metadata={ + "google_drive_file_id": file_id, + "google_drive_file_name": "Stuck File.pdf", + "md5_checksum": md5, + }, + ) + db_session.add(stuck_doc) + await db_session.flush() + + incoming_file = { + "id": file_id, + "name": "Stuck File.pdf", + "mimeType": "application/pdf", + "md5Checksum": md5, + } + + should_skip, _msg = await _should_skip_file(db_session, incoming_file, space_id) + + assert not should_skip, f"{stuck_state} documents must re-index, not be skipped" diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py b/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py new file mode 100644 index 000000000..9e3feee1e --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py @@ -0,0 +1,110 @@ +"""Integration tests for mark_connector_documents_failed. + +Covers the ETL-failure recovery path: a connector placeholder must move out of +``pending``/``processing`` into ``failed`` so it stays deletable, while a +``ready`` document is never clobbered. +""" + +import hashlib + +import pytest +from sqlalchemy import select + +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.tasks.connector_indexers.base import mark_connector_documents_failed + +pytestmark = pytest.mark.integration + + +async def _make_doc( + db_session, + *, + search_space_id: int, + connector_id: int, + user_id: str, + file_id: str, + status: dict, +) -> Document: + uid_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, search_space_id + ) + doc = Document( + title=f"{file_id}.pdf", + document_type=DocumentType.GOOGLE_DRIVE_FILE, + content="Pending...", + content_hash=hashlib.sha256(f"placeholder:{uid_hash}".encode()).hexdigest(), + unique_identifier_hash=uid_hash, + document_metadata={"google_drive_file_id": file_id}, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + status=status, + ) + db_session.add(doc) + await db_session.flush() + return doc + + +async def test_pending_placeholder_marked_failed( + db_session, db_search_space, db_connector, db_user +): + doc = await _make_doc( + db_session, + search_space_id=db_search_space.id, + connector_id=db_connector.id, + user_id=str(db_user.id), + file_id="file-pending", + status=DocumentStatus.pending(), + ) + + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("file-pending", "Download/ETL failed: boom")], + ) + + assert marked == 1 + await db_session.refresh(doc) + assert DocumentStatus.is_state(doc.status, DocumentStatus.FAILED) + assert doc.status.get("reason") == "Download/ETL failed: boom" + + +async def test_ready_document_not_clobbered( + db_session, db_search_space, db_connector, db_user +): + doc = await _make_doc( + db_session, + search_space_id=db_search_space.id, + connector_id=db_connector.id, + user_id=str(db_user.id), + file_id="file-ready", + status=DocumentStatus.ready(), + ) + + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("file-ready", "should be ignored")], + ) + + assert marked == 0 + await db_session.refresh(doc) + assert DocumentStatus.is_state(doc.status, DocumentStatus.READY) + + +async def test_missing_document_is_noop(db_session, db_search_space): + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("does-not-exist", "reason")], + ) + + assert marked == 0 + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert result.scalars().first() is None diff --git a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py index 694caed06..b87d1be42 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py @@ -74,7 +74,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" assert docs[0].document_type == DocumentType.DROPBOX_FILE @@ -99,7 +99,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -126,7 +126,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -152,7 +152,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 # Slice 5: Semaphore bound @@ -191,7 +191,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -230,7 +230,7 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" diff --git a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py index 65be05593..9a13e4525 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py @@ -69,7 +69,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" @@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 async def test_concurrency_bounded_by_semaphore( @@ -186,7 +186,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -225,7 +225,7 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" @@ -281,7 +281,7 @@ def full_scan_mocks(mock_drive_client, monkeypatch): monkeypatch.setattr(_mod, "_should_skip_file", _fake_skip) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 0, 0)) @@ -350,7 +350,7 @@ async def test_full_scan_three_phase_counts(full_scan_mocks, monkeypatch): ) mock_docs = [MagicMock(), MagicMock()] - full_scan_mocks["download_mock"].return_value = (mock_docs, 0) + full_scan_mocks["download_mock"].return_value = (mock_docs, []) full_scan_mocks["batch_mock"].return_value = ([], 2, 0) indexed, skipped, _unsupported = await _run_full_scan(full_scan_mocks) @@ -376,7 +376,7 @@ async def test_full_scan_respects_max_files(full_scan_mocks, monkeypatch): AsyncMock(return_value=(page_files, None, None)), ) - full_scan_mocks["download_mock"].return_value = ([], 0) + full_scan_mocks["download_mock"].return_value = ([], []) full_scan_mocks["batch_mock"].return_value = ([], 0, 0) await _run_full_scan(full_scan_mocks, max_files=3) @@ -400,7 +400,7 @@ async def test_full_scan_uses_max_concurrency_3_for_indexing( ) mock_docs = [MagicMock()] - full_scan_mocks["download_mock"].return_value = (mock_docs, 0) + full_scan_mocks["download_mock"].return_value = (mock_docs, []) full_scan_mocks["batch_mock"].return_value = ([], 1, 0) await _run_full_scan(full_scan_mocks) @@ -462,7 +462,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch): ) mock_docs = [MagicMock(), MagicMock()] - download_mock = AsyncMock(return_value=(mock_docs, 0)) + download_mock = AsyncMock(return_value=(mock_docs, [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 2, 0)) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py index eb1451938..01e81da17 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py @@ -68,7 +68,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" assert docs[0].document_type == DocumentType.ONEDRIVE_FILE @@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 # Slice 5: Semaphore bound @@ -185,7 +185,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -224,5 +224,5 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" diff --git a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py index a79ed7858..66722ffd7 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py @@ -325,7 +325,7 @@ def gdrive_full_scan_mocks(monkeypatch): _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) ) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 0, 0)) @@ -377,7 +377,7 @@ async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeyp "get_files_in_folder", AsyncMock(return_value=(page_files, None, None)), ) - m["download_mock"].return_value = ([], 0) + m["download_mock"].return_value = ([], []) m["batch_mock"].return_value = ([], 2, 0) _indexed, skipped, _unsup = await _run_gdrive_full_scan(m) @@ -403,7 +403,7 @@ async def test_gdrive_full_scan_deducts_after_indexing( AsyncMock(return_value=(page_files, None, None)), ) mock_docs = [MagicMock() for _ in range(3)] - m["download_mock"].return_value = (mock_docs, 0) + m["download_mock"].return_value = (mock_docs, []) m["batch_mock"].return_value = ([], 3, 0) await _run_gdrive_full_scan(m) @@ -438,7 +438,7 @@ async def test_gdrive_delta_sync_skips_over_quota(monkeypatch): _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) ) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 2, 0))