diff --git a/surfsense_backend/app/tasks/connector_indexers/base.py b/surfsense_backend/app/tasks/connector_indexers/base.py index ffc8ab72e..9408874ca 100644 --- a/surfsense_backend/app/tasks/connector_indexers/base.py +++ b/surfsense_backend/app/tasks/connector_indexers/base.py @@ -2,6 +2,7 @@ Base functionality and shared imports for connector indexers. """ +import contextlib import logging from datetime import UTC, datetime, timedelta @@ -10,6 +11,8 @@ from sqlalchemy.future import select from app.db import ( Document, + DocumentStatus, + DocumentType, SearchSourceConnector, SearchSourceConnectorType, ) @@ -130,6 +133,59 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def mark_connector_documents_failed( + session: AsyncSession, + *, + document_type: DocumentType, + search_space_id: int, + failures: list[tuple[str, str]], +) -> int: + """Transition placeholder/in-progress documents to ``failed`` by source id. + + Without this, a document whose download/ETL fails stays stuck in + ``pending``/``processing`` forever: undeletable in the UI and never retried. + + ``failures`` is a list of ``(unique_id, reason)``. Best-effort: never raises, + and leaves ``ready`` documents untouched. Returns the number marked failed. + """ + if not failures: + return 0 + + from app.indexing_pipeline.document_hashing import compute_identifier_hash + + marked = 0 + try: + for unique_id, reason in failures: + if not unique_id: + continue + uid_hash = compute_identifier_hash( + document_type.value, unique_id, search_space_id + ) + existing = await check_document_by_unique_identifier(session, uid_hash) + if existing is None: + continue + if DocumentStatus.is_state(existing.status, DocumentStatus.READY): + continue + existing.status = DocumentStatus.failed(reason) + existing.updated_at = datetime.now(UTC) + marked += 1 + + if marked: + await session.commit() + except Exception: + with contextlib.suppress(Exception): + await session.rollback() + logger.warning( + "Failed to mark %d connector document(s) as failed (type=%s)", + len(failures), + getattr(document_type, "value", document_type), + exc_info=True, + ) + return 0 + + return marked + + async def get_connector_by_id( session: AsyncSession, connector_id: int, connector_type: SearchSourceConnectorType ) -> SearchSourceConnector | None: