From cb10882dc8908bd0062459f041b675c39c7fd274 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 01/28] feat(indexers): add mark_connector_documents_failed helper --- .../app/tasks/connector_indexers/base.py | 56 +++++++++++++++++++ 1 file changed, 56 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index ffc8ab72e..9408874ca 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -2,6 +2,7 @@ Base functionality and shared imports for connector indexers. """ +import contextlib import logging from datetime import UTC, datetime, timedelta @@ -10,6 +11,8 @@ from sqlalchemy.future import select from app.db import ( Document, + DocumentStatus, + DocumentType, SearchSourceConnector, SearchSourceConnectorType, ) @@ -130,6 +133,59 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def mark_connector_documents_failed( + session: AsyncSession, + *, + document_type: DocumentType, + search_space_id: int, + failures: list[tuple[str, str]], +) -> int: + """Transition placeholder/in-progress documents to ``failed`` by source id. + + Without this, a document whose download/ETL fails stays stuck in + ``pending``/``processing`` forever: undeletable in the UI and never retried. + + ``failures`` is a list of ``(unique_id, reason)``. Best-effort: never raises, + and leaves ``ready`` documents untouched. Returns the number marked failed. + """ + if not failures: + return 0 + + from app.indexing_pipeline.document_hashing import compute_identifier_hash + + marked = 0 + try: + for unique_id, reason in failures: + if not unique_id: + continue + uid_hash = compute_identifier_hash( + document_type.value, unique_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, uid_hash) + if existing is None: + continue + if DocumentStatus.is_state(existing.status, DocumentStatus.READY): + continue + existing.status = DocumentStatus.failed(reason) + existing.updated_at = datetime.now(UTC) + marked += 1 + + if marked: + await session.commit() + except Exception: + with contextlib.suppress(Exception): + await session.rollback() + logger.warning( + "Failed to mark %d connector document(s) as failed (type=%s)", + len(failures), + getattr(document_type, "value", document_type), + exc_info=True, + ) + return 0 + + return marked + + async def get_connector_by_id( session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType ) -> SearchSourceConnector | None: From 6fd95f82b42e356efb869d27cfbea559ff5dc09b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 02/28] fix(google-drive): mark placeholders failed on ETL failure --- .../google_drive_indexer.py | 64 +++++++++++++++---- 1 file changed, 50 insertions(+), 14 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 68c43716b..29550c215 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -45,6 +45,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES @@ -458,10 +459,11 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel, returning ConnectorDocuments. +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. - Returns (connector_docs, download_failed_count). + Returns (connector_docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) @@ -469,7 +471,8 @@ async def _download_files_parallel( completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, drive_metadata, error = await download_and_extract_content( @@ -479,7 +482,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -500,14 +503,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {outcome}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _process_single_file( @@ -542,7 +559,16 @@ async def _process_single_file( drive_client, file, vision_llm=vision_llm ) if error or not markdown: - logger.warning(f"ETL failed for {file_name}: {error}") + reason = error or "empty content" + logger.warning(f"ETL failed for {file_name}: {reason}") + file_id = file.get("id") + if file_id: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=[(file_id, f"Download/ETL failed: {reason}")], + ) return 0, 1, 0 doc = _build_connector_doc( @@ -630,7 +656,7 @@ async def _download_and_index( Returns (batch_indexed, total_failed). """ - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( drive_client, files, connector_id=connector_id, @@ -640,6 +666,16 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail the placeholders for files whose download/ETL failed, so they don't + # stay stuck in 'pending'. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -650,7 +686,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _index_selected_files( From 82aaaa5a9f53810030454c0de105d79a47a1b47f Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 03/28] fix(onedrive): mark documents failed on ETL failure --- .../connector_indexers/onedrive_indexer.py | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py index 5d783e497..f98b330d7 100644 --- a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py @@ -32,6 +32,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -165,15 +166,20 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel. Returns (docs, failed_count).""" +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. + + Returns (docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. + """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) last_heartbeat = time.time() completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, od_metadata, error = await download_and_extract_content( @@ -183,7 +189,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -204,14 +210,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {outcome}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _download_and_index( @@ -226,7 +246,7 @@ async def _download_and_index( vision_llm=None, ) -> tuple[int, int]: """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( onedrive_client, files, connector_id=connector_id, @@ -236,6 +256,15 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail rows for files whose download/ETL failed, so they don't stay stuck. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.ONEDRIVE_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -246,7 +275,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): From e45e8389dc95300c70ac09618b3887d82e186a6f Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 04/28] fix(dropbox): mark documents failed on ETL failure --- .../connector_indexers/dropbox_indexer.py | 53 ++++++++++++++----- 1 file changed, 41 insertions(+), 12 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 6e61bce18..76fa34159 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -32,6 +32,7 @@ from app.services.task_logging_service import TaskLoggingService from app.tasks.connector_indexers.base import ( check_document_by_unique_identifier, get_connector_by_id, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -158,15 +159,20 @@ async def _download_files_parallel( max_concurrency: int = 3, on_heartbeat: HeartbeatCallbackType | None = None, vision_llm=None, -) -> tuple[list[ConnectorDocument], int]: - """Download and ETL files in parallel. Returns (docs, failed_count).""" +) -> tuple[list[ConnectorDocument], list[tuple[str, str]]]: + """Download and ETL files in parallel. + + Returns (docs, failed_files), where failed_files is a list of + (file_id, reason) so callers can mark those placeholders failed. + """ results: list[ConnectorDocument] = [] sem = asyncio.Semaphore(max_concurrency) last_heartbeat = time.time() completed_count = 0 hb_lock = asyncio.Lock() - async def _download_one(file: dict) -> ConnectorDocument | None: + async def _download_one(file: dict) -> ConnectorDocument | str: + # ConnectorDocument on success; failure reason string otherwise. nonlocal last_heartbeat, completed_count async with sem: markdown, db_metadata, error = await download_and_extract_content( @@ -176,7 +182,7 @@ async def _download_files_parallel( file_name = file.get("name", "Unknown") reason = error or "empty content" logger.warning(f"Download/ETL failed for {file_name}: {reason}") - return None + return f"Download/ETL failed: {reason}" doc = _build_connector_doc( file, markdown, @@ -197,14 +203,28 @@ async def _download_files_parallel( tasks = [_download_one(f) for f in files] outcomes = await asyncio.gather(*tasks, return_exceptions=True) - failed = 0 - for outcome in outcomes: - if isinstance(outcome, Exception) or outcome is None: - failed += 1 - else: + failed_files: list[tuple[str, str]] = [] + for file, outcome in zip(files, outcomes, strict=False): + if isinstance(outcome, ConnectorDocument): results.append(outcome) + continue + file_id = file.get("id") + if isinstance(outcome, Exception): + reason = f"Download/ETL error: {outcome}" + logger.warning( + "Download/ETL exception for %s: %s", + file.get("name", "Unknown"), + outcome, + exc_info=outcome, + ) + elif isinstance(outcome, str): + reason = outcome + else: + reason = "Download or extraction failed" + if file_id: + failed_files.append((file_id, reason)) - return results, failed + return results, failed_files async def _download_and_index( @@ -219,7 +239,7 @@ async def _download_and_index( vision_llm=None, ) -> tuple[int, int]: """Parallel download then parallel indexing. Returns (batch_indexed, total_failed).""" - connector_docs, download_failed = await _download_files_parallel( + connector_docs, failed_files = await _download_files_parallel( dropbox_client, files, connector_id=connector_id, @@ -229,6 +249,15 @@ async def _download_and_index( vision_llm=vision_llm, ) + # Fail rows for files whose download/ETL failed, so they don't stay stuck. + if failed_files: + await mark_connector_documents_failed( + session, + document_type=DocumentType.DROPBOX_FILE, + search_space_id=search_space_id, + failures=failed_files, + ) + batch_indexed = 0 batch_failed = 0 if connector_docs: @@ -239,7 +268,7 @@ async def _download_and_index( on_heartbeat=on_heartbeat, ) - return batch_indexed, download_failed + batch_failed + return batch_indexed, len(failed_files) + batch_failed async def _remove_document(session: AsyncSession, file_id: str, search_space_id: int): From 5f59ad3ad3ac5ecb092e79b2287fa9662d52c285 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 05/28] test(google-drive): update download failure return shape --- .../test_google_drive_parallel.py | 22 +++++++++---------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py index 65be05593..9a13e4525 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_google_drive_parallel.py @@ -69,7 +69,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" @@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 async def test_concurrency_bounded_by_semaphore( @@ -186,7 +186,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -225,7 +225,7 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" @@ -281,7 +281,7 @@ def full_scan_mocks(mock_drive_client, monkeypatch): monkeypatch.setattr(_mod, "_should_skip_file", _fake_skip) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 0, 0)) @@ -350,7 +350,7 @@ async def test_full_scan_three_phase_counts(full_scan_mocks, monkeypatch): ) mock_docs = [MagicMock(), MagicMock()] - full_scan_mocks["download_mock"].return_value = (mock_docs, 0) + full_scan_mocks["download_mock"].return_value = (mock_docs, []) full_scan_mocks["batch_mock"].return_value = ([], 2, 0) indexed, skipped, _unsupported = await _run_full_scan(full_scan_mocks) @@ -376,7 +376,7 @@ async def test_full_scan_respects_max_files(full_scan_mocks, monkeypatch): AsyncMock(return_value=(page_files, None, None)), ) - full_scan_mocks["download_mock"].return_value = ([], 0) + full_scan_mocks["download_mock"].return_value = ([], []) full_scan_mocks["batch_mock"].return_value = ([], 0, 0) await _run_full_scan(full_scan_mocks, max_files=3) @@ -400,7 +400,7 @@ async def test_full_scan_uses_max_concurrency_3_for_indexing( ) mock_docs = [MagicMock()] - full_scan_mocks["download_mock"].return_value = (mock_docs, 0) + full_scan_mocks["download_mock"].return_value = (mock_docs, []) full_scan_mocks["batch_mock"].return_value = ([], 1, 0) await _run_full_scan(full_scan_mocks) @@ -462,7 +462,7 @@ async def test_delta_sync_removals_serial_rest_parallel(monkeypatch): ) mock_docs = [MagicMock(), MagicMock()] - download_mock = AsyncMock(return_value=(mock_docs, 0)) + download_mock = AsyncMock(return_value=(mock_docs, [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 2, 0)) From b5aa41beb6afce873b55ca2ec77605536651ea7a Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 06/28] test(onedrive): update download failure return shape --- .../connector_indexers/test_onedrive_parallel.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py index eb1451938..01e81da17 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_onedrive_parallel.py @@ -68,7 +68,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" assert docs[0].document_type == DocumentType.ONEDRIVE_FILE @@ -93,7 +93,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -120,7 +120,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -146,7 +146,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 # Slice 5: Semaphore bound @@ -185,7 +185,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -224,5 +224,5 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" From bdd3728c5b7318f6ea4e3e971a7abed4e86786de Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 07/28] test(dropbox): update download failure return shape --- .../unit/connector_indexers/test_dropbox_parallel.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py index 694caed06..b87d1be42 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_dropbox_parallel.py @@ -74,7 +74,7 @@ async def test_single_file_returns_one_connector_document( ) assert len(docs) == 1 - assert failed == 0 + assert failed == [] assert docs[0].title == "test.txt" assert docs[0].unique_id == "f1" assert docs[0].document_type == DocumentType.DROPBOX_FILE @@ -99,7 +99,7 @@ async def test_multiple_files_all_produce_documents( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert {d.unique_id for d in docs} == {"f0", "f1", "f2"} @@ -126,7 +126,7 @@ async def test_one_download_exception_does_not_block_others( ) assert len(docs) == 2 - assert failed == 1 + assert len(failed) == 1 assert {d.unique_id for d in docs} == {"f0", "f2"} @@ -152,7 +152,7 @@ async def test_etl_error_counts_as_download_failure( ) assert len(docs) == 1 - assert failed == 1 + assert len(failed) == 1 # Slice 5: Semaphore bound @@ -191,7 +191,7 @@ async def test_concurrency_bounded_by_semaphore( ) assert len(docs) == 6 - assert failed == 0 + assert failed == [] assert peak <= 2, f"Peak concurrency was {peak}, expected <= 2" @@ -230,7 +230,7 @@ async def test_heartbeat_fires_during_parallel_downloads( ) assert len(docs) == 3 - assert failed == 0 + assert failed == [] assert len(heartbeat_calls) >= 1, "Heartbeat should have fired at least once" From 9f76daec8ff59ea863ebf6082e6497304531994b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Tue, 9 Jun 2026 23:39:25 +0200 Subject: [PATCH 08/28] test(indexers): update download mock return shape --- .../tests/unit/connector_indexers/test_page_limits.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py index a79ed7858..66722ffd7 100644 --- a/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py +++ b/surfsense_backend/tests/unit/connector_indexers/test_page_limits.py @@ -325,7 +325,7 @@ def gdrive_full_scan_mocks(monkeypatch): _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) ) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 0, 0)) @@ -377,7 +377,7 @@ async def test_gdrive_full_scan_skips_over_quota(gdrive_full_scan_mocks, monkeyp "get_files_in_folder", AsyncMock(return_value=(page_files, None, None)), ) - m["download_mock"].return_value = ([], 0) + m["download_mock"].return_value = ([], []) m["batch_mock"].return_value = ([], 2, 0) _indexed, skipped, _unsup = await _run_gdrive_full_scan(m) @@ -403,7 +403,7 @@ async def test_gdrive_full_scan_deducts_after_indexing( AsyncMock(return_value=(page_files, None, None)), ) mock_docs = [MagicMock() for _ in range(3)] - m["download_mock"].return_value = (mock_docs, 0) + m["download_mock"].return_value = (mock_docs, []) m["batch_mock"].return_value = ([], 3, 0) await _run_gdrive_full_scan(m) @@ -438,7 +438,7 @@ async def test_gdrive_delta_sync_skips_over_quota(monkeypatch): _mod, "_should_skip_file", AsyncMock(return_value=(False, None)) ) - download_mock = AsyncMock(return_value=([], 0)) + download_mock = AsyncMock(return_value=([], [])) monkeypatch.setattr(_mod, "_download_files_parallel", download_mock) batch_mock = AsyncMock(return_value=([], 2, 0)) From 8699befaa0d8d49ae2850cf3320b71768ebaffcc Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:25 +0200 Subject: [PATCH 09/28] fix(indexing): log and recover session in rollback_and_persist_failure --- .../indexing_pipeline/document_persistence.py | 22 +++++++++++++++++-- 1 file changed, 20 insertions(+), 2 deletions(-) diff --git a/surfsense_backend/app/indexing_pipeline/document_persistence.py b/surfsense_backend/app/indexing_pipeline/document_persistence.py index d7810e516..9fd8867e2 100644 --- a/surfsense_backend/app/indexing_pipeline/document_persistence.py +++ b/surfsense_backend/app/indexing_pipeline/document_persistence.py @@ -1,3 +1,5 @@ +import contextlib +import logging from datetime import UTC, datetime from sqlalchemy.ext.asyncio import AsyncSession @@ -6,6 +8,8 @@ from sqlalchemy.orm.attributes import set_committed_value from app.db import Document, DocumentStatus +logger = logging.getLogger(__name__) + async def rollback_and_persist_failure( session: AsyncSession, document: Document, message: str @@ -18,14 +22,28 @@ async def rollback_and_persist_failure( try: await session.rollback() except Exception: - return # Session is completely dead; nothing further we can do. + # Session is completely dead; surface it but never raise. + logger.warning( + "Rollback failed; cannot persist failed status for document %s", + getattr(document, "id", "unknown"), + exc_info=True, + ) + return try: await session.refresh(document) document.updated_at = datetime.now(UTC) document.status = DocumentStatus.failed(message) await session.commit() except Exception: - pass # Best-effort; document will be retried on the next sync. + # Best-effort: the document stays non-ready and is retried next sync. + # Log it so a permanently-stuck document is at least traceable. + logger.warning( + "Could not persist failed status for document %s; will retry next sync", + getattr(document, "id", "unknown"), + exc_info=True, + ) + with contextlib.suppress(Exception): + await session.rollback() def attach_chunks_to_document(document: Document, chunks: list) -> None: From c0c5f3414ec29282e60b478de2745f7fbfe3889b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:25 +0200 Subject: [PATCH 10/28] fix(google-drive): sanitize ETL reason and retry stuck pending/processing files --- .../tasks/connector_indexers/google_drive_indexer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index 29550c215..b76f84bac 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -35,6 +35,7 @@ from app.connectors.google_drive.file_types import ( from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import ( IndexingPipelineService, PlaceholderInfo, @@ -368,7 +369,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' → '{file_name}'") return True, f"File renamed: '{old_name}' → '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -510,7 +516,7 @@ async def _download_files_parallel( continue file_id = file.get("id") if isinstance(outcome, Exception): - reason = f"Download/ETL error: {outcome}" + reason = f"Download/ETL error: {safe_exception_message(outcome)}" logger.warning( "Download/ETL exception for %s: %s", file.get("name", "Unknown"), From 464e7d45544cbbe68f9643115f98764c9a064660 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:25 +0200 Subject: [PATCH 11/28] fix(onedrive): sanitize ETL reason and retry stuck pending/processing files --- .../app/tasks/connector_indexers/onedrive_indexer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py index f98b330d7..3fd8a79f2 100644 --- a/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/onedrive_indexer.py @@ -26,6 +26,7 @@ from app.connectors.onedrive.file_types import should_skip_file as skip_item from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService @@ -120,7 +121,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'") return True, f"File renamed: '{old_name}' -> '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -217,7 +223,7 @@ async def _download_files_parallel( continue file_id = file.get("id") if isinstance(outcome, Exception): - reason = f"Download/ETL error: {outcome}" + reason = f"Download/ETL error: {safe_exception_message(outcome)}" logger.warning( "Download/ETL exception for %s: %s", file.get("name", "Unknown"), From 33300e4faacdb52b82f218173bb5ab6715c0cebc Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:25 +0200 Subject: [PATCH 12/28] fix(dropbox): sanitize ETL reason and retry stuck pending/processing files --- .../app/tasks/connector_indexers/dropbox_indexer.py | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py index 76fa34159..7cd3e1613 100644 --- a/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/dropbox_indexer.py @@ -26,6 +26,7 @@ from app.connectors.dropbox.file_types import should_skip_file as skip_item from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.indexing_pipeline.connector_document import ConnectorDocument from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.exceptions import safe_exception_message from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService from app.services.page_limit_service import PageLimitService from app.services.task_logging_service import TaskLoggingService @@ -113,7 +114,12 @@ async def _should_skip_file( logger.info(f"Rename-only update: '{old_name}' -> '{file_name}'") return True, f"File renamed: '{old_name}' -> '{file_name}'" - if not DocumentStatus.is_state(existing.status, DocumentStatus.READY): + state = DocumentStatus.get_state(existing.status) + if state in (DocumentStatus.PENDING, DocumentStatus.PROCESSING): + # Stuck placeholder/in-progress doc (e.g. worker died mid-index): re-index + # instead of skipping, otherwise it never recovers. + return False, None + if state != DocumentStatus.READY: return True, "skipped (previously failed)" return True, "unchanged" @@ -210,7 +216,7 @@ async def _download_files_parallel( continue file_id = file.get("id") if isinstance(outcome, Exception): - reason = f"Download/ETL error: {outcome}" + reason = f"Download/ETL error: {safe_exception_message(outcome)}" logger.warning( "Download/ETL exception for %s: %s", file.get("name", "Unknown"), From 90b32a88805060adab5a5f0d35cc7f7f805e3702 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:42 +0200 Subject: [PATCH 13/28] fix(notion): fail skipped placeholders so they don't stay pending --- .../tasks/connector_indexers/notion_indexer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index 1ca9ca4ba..9ebafbcdb 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -27,6 +27,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -343,6 +344,23 @@ async def index_notion_pages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.NOTION_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) From b2c2fc9c2e4f49567227011771e4c61079227c0c Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:42 +0200 Subject: [PATCH 14/28] fix(gmail): fail skipped placeholders so they don't stay pending --- .../connector_indexers/google_gmail_indexer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index 225e3618e..25da96b61 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -29,6 +29,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -479,6 +480,23 @@ async def index_google_gmail_messages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) From 1b0912aaa3b3dad413bd33dece65b1a44b34f494 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:42 +0200 Subject: [PATCH 15/28] fix(calendar): fail skipped placeholders so they don't stay pending --- .../google_calendar_indexer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 97f01d68b..51df39171 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -28,6 +28,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, parse_date_flexible, update_connector_last_indexed, ) @@ -448,6 +449,23 @@ async def index_google_calendar_events( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) From d70d01f331e649c546107977b83a1c7356cbb205 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:42 +0200 Subject: [PATCH 16/28] fix(linear): fail skipped placeholders so they don't stay pending --- .../tasks/connector_indexers/linear_indexer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index 12749b82b..2bde77f79 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -25,6 +25,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -303,6 +304,23 @@ async def index_linear_issues( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.LINEAR_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + # ── Finalize ────────────────────────────────────────────────── await update_connector_last_indexed(session, connector, update_last_indexed) From c47949791b295b27ac668d1a4d50d6642c4089a0 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:42 +0200 Subject: [PATCH 17/28] fix(confluence): fail skipped placeholders so they don't stay pending --- .../connector_indexers/confluence_indexer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 1187edd98..53c438197 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -21,6 +21,7 @@ from .base import ( check_duplicate_document_by_hash, get_connector_by_id, logger, + mark_connector_documents_failed, update_connector_last_indexed, ) @@ -295,6 +296,23 @@ async def index_confluence_pages( heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS, ) + # Placeholders for items skipped above (empty/duplicate/unbuildable) would + # otherwise stay stuck in 'pending' and undeletable. Fail them so they're + # recoverable. Leaves already-ready docs untouched. + indexed_ids = {doc.unique_id for doc in connector_docs} + stuck_placeholders = [ + (p.unique_id, "Skipped during sync: no indexable content") + for p in placeholders + if p.unique_id and p.unique_id not in indexed_ids + ] + if stuck_placeholders: + await mark_connector_documents_failed( + session, + document_type=DocumentType.CONFLUENCE_CONNECTOR, + search_space_id=search_space_id, + failures=stuck_placeholders, + ) + await update_connector_last_indexed(session, connector, update_last_indexed) logger.info( From be8a3bcd0045343c9e53f266122ba583a86535dc Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 18/28] fix(slack): commit failed status immediately --- .../app/tasks/connector_indexers/slack_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 2c6d0e11e..ac63af38c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -9,6 +9,7 @@ Uses 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -586,10 +587,15 @@ async def index_slack_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From 791b0afe16bb206683c9b72dc8e74f37d3be1064 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 19/28] fix(discord): commit failed status immediately --- .../app/tasks/connector_indexers/discord_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 180f21412..8c5bd8f0e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime, timedelta @@ -713,10 +714,15 @@ async def index_discord_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From f085ac59e5e593aa2a6f9572227754d2532d6a11 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 20/28] fix(teams): commit failed status immediately --- .../app/tasks/connector_indexers/teams_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 25994895a..e48aedaa5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -10,6 +10,7 @@ Uses 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -630,11 +631,16 @@ async def index_teams_messages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( "Failed to update document status to failed: %s", str(status_error), ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From f5dd8f398531fdc4308f292ce6e98a0312b94ea3 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 21/28] fix(github): commit failed status immediately --- .../app/tasks/connector_indexers/github_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index 1d0b004d8..ce9b80e5e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -9,6 +9,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -413,10 +414,15 @@ async def index_github_repos( try: document.status = DocumentStatus.failed(str(repo_err)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() errors.append(f"Failed processing {repo_full_name}: {repo_err}") documents_failed += 1 continue From 45438249b626453f607320a71a1811bcfd556c32 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 22/28] fix(clickup): commit failed status immediately --- .../app/tasks/connector_indexers/clickup_indexer.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 7b40a4b22..91763129f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -437,10 +437,15 @@ async def index_clickup_tasks( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From 8191118eb4a1a879ca4138e7e724cb5375770fd2 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 23/28] fix(bookstack): commit failed status immediately --- .../app/tasks/connector_indexers/bookstack_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index 74234a3b9..6471ffb00 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each page: pending → processing → ready/failed """ +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -432,10 +433,15 @@ async def index_bookstack_pages( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() skipped_pages.append( f"{item.get('page_name', 'Unknown')} (processing error)" ) From e3afe9d7c73298d7b1b3f2321803c6698641571b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 24/28] fix(luma): commit failed status immediately --- .../app/tasks/connector_indexers/luma_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 9bcba5a37..eab2c9793 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -7,6 +7,7 @@ Implements 2-phase document status updates for real-time UI feedback: """ import asyncio +import contextlib import time from collections.abc import Awaitable, Callable from datetime import datetime, timedelta @@ -485,10 +486,15 @@ async def index_luma_events( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() skipped_events.append( f"{item.get('event_name', 'Unknown')} (processing error)" ) From c26181d0867267e1cfef95baf42150056a429ffd Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 25/28] fix(airtable): commit failed status immediately --- .../app/tasks/connector_indexers/airtable_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index ac38b7bf7..e2a1b109a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -6,6 +6,7 @@ Implements real-time document status updates using a two-phase approach: - Phase 2: Process each document one by one (pending → processing → ready/failed) """ +import contextlib import time from collections.abc import Awaitable, Callable @@ -432,10 +433,15 @@ async def index_airtable_records( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From ba687813c166a110a612909586278c5e910a885a Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:10:52 +0200 Subject: [PATCH 26/28] fix(elasticsearch): commit failed status immediately --- .../app/tasks/connector_indexers/elasticsearch_indexer.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 3283b41eb..ba0aa3445 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import contextlib import json import logging import time @@ -406,10 +407,15 @@ async def index_elasticsearch_documents( try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() + # Commit now so the failed status survives a later rollback or + # crash; otherwise the doc stays stuck in pending/processing. + await session.commit() except Exception as status_error: logger.error( f"Failed to update document status to failed: {status_error}" ) + with contextlib.suppress(Exception): + await session.rollback() documents_failed += 1 continue From 77544ab768fe96e8f8430e5c57f1c7bf9f950521 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:11:00 +0200 Subject: [PATCH 27/28] test(google-drive): assert stuck pending/processing docs retry --- .../indexing_pipeline/test_drive_pipeline.py | 72 +++++++++++++++++++ 1 file changed, 72 insertions(+) diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py index c7565f4ba..6e85421ea 100644 --- a/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py @@ -177,3 +177,75 @@ async def test_should_skip_file_skips_failed_document( assert should_skip, "FAILED documents must be skipped during automatic sync" assert "failed" in msg.lower() + + +@pytest.mark.parametrize("stuck_state", ["pending", "processing"]) +async def test_should_skip_file_retries_stuck_document( + db_session, + db_search_space, + db_user, + stuck_state, +): + """A doc stuck in pending/processing (worker died mid-index) must re-index, not skip.""" + import importlib + import sys + import types + + pkg = "app.tasks.connector_indexers" + stub = pkg not in sys.modules + if stub: + mod = types.ModuleType(pkg) + mod.__path__ = ["app/tasks/connector_indexers"] + mod.__package__ = pkg + sys.modules[pkg] = mod + + try: + gdm = importlib.import_module( + "app.tasks.connector_indexers.google_drive_indexer" + ) + _should_skip_file = gdm._should_skip_file + finally: + if stub: + sys.modules.pop(pkg, None) + + space_id = db_search_space.id + file_id = f"file-{stuck_state}-drive" + md5 = "stuck123checksum" + + doc_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, space_id + ) + status = ( + DocumentStatus.pending() + if stuck_state == "pending" + else DocumentStatus.processing() + ) + stuck_doc = Document( + title="Stuck File.pdf", + document_type=DocumentType.GOOGLE_DRIVE_FILE, + content="Pending...", + content_hash=f"ch-{doc_hash[:12]}", + unique_identifier_hash=doc_hash, + source_markdown="", + search_space_id=space_id, + created_by_id=str(db_user.id), + status=status, + document_metadata={ + "google_drive_file_id": file_id, + "google_drive_file_name": "Stuck File.pdf", + "md5_checksum": md5, + }, + ) + db_session.add(stuck_doc) + await db_session.flush() + + incoming_file = { + "id": file_id, + "name": "Stuck File.pdf", + "mimeType": "application/pdf", + "md5Checksum": md5, + } + + should_skip, _msg = await _should_skip_file(db_session, incoming_file, space_id) + + assert not should_skip, f"{stuck_state} documents must re-index, not be skipped" From 59c1cf14c7c14812a20f438a62a9c36647c22134 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 10 Jun 2026 00:11:00 +0200 Subject: [PATCH 28/28] test(indexers): cover mark_connector_documents_failed behavior --- .../test_mark_connector_documents_failed.py | 110 ++++++++++++++++++ 1 file changed, 110 insertions(+) create mode 100644 surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py b/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py new file mode 100644 index 000000000..9e3feee1e --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_mark_connector_documents_failed.py @@ -0,0 +1,110 @@ +"""Integration tests for mark_connector_documents_failed. + +Covers the ETL-failure recovery path: a connector placeholder must move out of +``pending``/``processing`` into ``failed`` so it stays deletable, while a +``ready`` document is never clobbered. +""" + +import hashlib + +import pytest +from sqlalchemy import select + +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.tasks.connector_indexers.base import mark_connector_documents_failed + +pytestmark = pytest.mark.integration + + +async def _make_doc( + db_session, + *, + search_space_id: int, + connector_id: int, + user_id: str, + file_id: str, + status: dict, +) -> Document: + uid_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, search_space_id + ) + doc = Document( + title=f"{file_id}.pdf", + document_type=DocumentType.GOOGLE_DRIVE_FILE, + content="Pending...", + content_hash=hashlib.sha256(f"placeholder:{uid_hash}".encode()).hexdigest(), + unique_identifier_hash=uid_hash, + document_metadata={"google_drive_file_id": file_id}, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + status=status, + ) + db_session.add(doc) + await db_session.flush() + return doc + + +async def test_pending_placeholder_marked_failed( + db_session, db_search_space, db_connector, db_user +): + doc = await _make_doc( + db_session, + search_space_id=db_search_space.id, + connector_id=db_connector.id, + user_id=str(db_user.id), + file_id="file-pending", + status=DocumentStatus.pending(), + ) + + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("file-pending", "Download/ETL failed: boom")], + ) + + assert marked == 1 + await db_session.refresh(doc) + assert DocumentStatus.is_state(doc.status, DocumentStatus.FAILED) + assert doc.status.get("reason") == "Download/ETL failed: boom" + + +async def test_ready_document_not_clobbered( + db_session, db_search_space, db_connector, db_user +): + doc = await _make_doc( + db_session, + search_space_id=db_search_space.id, + connector_id=db_connector.id, + user_id=str(db_user.id), + file_id="file-ready", + status=DocumentStatus.ready(), + ) + + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("file-ready", "should be ignored")], + ) + + assert marked == 0 + await db_session.refresh(doc) + assert DocumentStatus.is_state(doc.status, DocumentStatus.READY) + + +async def test_missing_document_is_noop(db_session, db_search_space): + marked = await mark_connector_documents_failed( + db_session, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=db_search_space.id, + failures=[("does-not-exist", "reason")], + ) + + assert marked == 0 + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert result.scalars().first() is None