From 0aeb888be0314612335fe5143c1843497831079b Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 25 Feb 2026 15:26:04 +0200 Subject: [PATCH] add structured error handling to indexing pipeline --- .../app/indexing_pipeline/exceptions.py | 125 ++++++++++++++++++ .../indexing_pipeline_service.py | 52 +++++++- 2 files changed, 171 insertions(+), 6 deletions(-) create mode 100644 surfsense_backend/app/indexing_pipeline/exceptions.py diff --git a/surfsense_backend/app/indexing_pipeline/exceptions.py b/surfsense_backend/app/indexing_pipeline/exceptions.py new file mode 100644 index 000000000..bf4d80062 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/exceptions.py @@ -0,0 +1,125 @@ +from litellm.exceptions import ( + APIConnectionError, + APIResponseValidationError, + AuthenticationError, + BadGatewayError, + BadRequestError, + InternalServerError, + NotFoundError, + PermissionDeniedError, + RateLimitError, + ServiceUnavailableError, + Timeout, + UnprocessableEntityError, +) +from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError, SQLAlchemyError +from sqlalchemy.orm.exc import DetachedInstanceError + +# Tuples for use directly in except clauses. +RETRYABLE_LLM_ERRORS = ( + RateLimitError, + Timeout, + ServiceUnavailableError, + BadGatewayError, + InternalServerError, + APIConnectionError, +) + +PERMANENT_LLM_ERRORS = ( + AuthenticationError, + PermissionDeniedError, + NotFoundError, + BadRequestError, + UnprocessableEntityError, + APIResponseValidationError, +) + +TRANSIENT_DB_ERRORS = ( + OperationalError, + IntegrityError, +) + +# Session is broken after these — re-raise instead of attempting further DB calls. +FATAL_DB_ERRORS = ( + InvalidRequestError, + DetachedInstanceError, +) + + +# (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError). +EMBEDDING_ERRORS = ( + RuntimeError, + OSError, + MemoryError, + ValueError, +) + + +class PipelineMessages: + RATE_LIMIT = "LLM rate limit exceeded. Will retry on next sync." + LLM_TIMEOUT = "LLM request timed out. Will retry on next sync." + LLM_UNAVAILABLE = "LLM service temporarily unavailable. Will retry on next sync." + LLM_BAD_GATEWAY = "LLM gateway error. Will retry on next sync." + LLM_SERVER_ERROR = "LLM internal server error. Will retry on next sync." + LLM_CONNECTION = "Could not reach the LLM service. Check network connectivity." + + LLM_AUTH = "LLM authentication failed. Check your API key." + LLM_PERMISSION = "LLM request denied. Check your account permissions." + LLM_NOT_FOUND = "LLM model not found. Check your model configuration." + LLM_BAD_REQUEST = "LLM rejected the request. Document content may be invalid." + LLM_UNPROCESSABLE = "Document exceeds the LLM context window even after optimization." + LLM_RESPONSE = "LLM returned an invalid response." + + DB_TRANSIENT = "Database error during indexing. Will retry on next sync." + DB_SESSION_DEAD = "Database session is in an unrecoverable state." + + EMBEDDING_FAILED = "Embedding failed. Check your embedding model configuration or service." + EMBEDDING_MODEL = "Embedding model files are missing or corrupted." + EMBEDDING_MEMORY = "Not enough memory to embed this document." + EMBEDDING_INPUT = "Document content is invalid for the embedding model." + + CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk." + + +def llm_retryable_message(exc: Exception) -> str: + if isinstance(exc, RateLimitError): + return PipelineMessages.RATE_LIMIT + if isinstance(exc, Timeout): + return PipelineMessages.LLM_TIMEOUT + if isinstance(exc, ServiceUnavailableError): + return PipelineMessages.LLM_UNAVAILABLE + if isinstance(exc, BadGatewayError): + return PipelineMessages.LLM_BAD_GATEWAY + if isinstance(exc, InternalServerError): + return PipelineMessages.LLM_SERVER_ERROR + if isinstance(exc, APIConnectionError): + return PipelineMessages.LLM_CONNECTION + return str(exc) + + +def llm_permanent_message(exc: Exception) -> str: + if isinstance(exc, AuthenticationError): + return PipelineMessages.LLM_AUTH + if isinstance(exc, PermissionDeniedError): + return PipelineMessages.LLM_PERMISSION + if isinstance(exc, NotFoundError): + return PipelineMessages.LLM_NOT_FOUND + if isinstance(exc, BadRequestError): + return PipelineMessages.LLM_BAD_REQUEST + if isinstance(exc, UnprocessableEntityError): + return PipelineMessages.LLM_UNPROCESSABLE + if isinstance(exc, APIResponseValidationError): + return PipelineMessages.LLM_RESPONSE + return str(exc) + + +def embedding_message(exc: Exception) -> str: + if isinstance(exc, RuntimeError): + return PipelineMessages.EMBEDDING_FAILED + if isinstance(exc, OSError): + return PipelineMessages.EMBEDDING_MODEL + if isinstance(exc, MemoryError): + return PipelineMessages.EMBEDDING_MEMORY + if isinstance(exc, ValueError): + return PipelineMessages.EMBEDDING_INPUT + return str(exc) diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 9cbeedba8..0e114d998 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -1,7 +1,6 @@ from datetime import UTC, datetime from sqlalchemy import delete, select -from sqlalchemy.exc import IntegrityError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm import object_session from sqlalchemy.orm.attributes import set_committed_value @@ -12,6 +11,27 @@ from app.indexing_pipeline.document_chunker import chunk_text from app.indexing_pipeline.document_embedder import embed_text from app.indexing_pipeline.document_hashing import compute_content_hash, compute_unique_identifier_hash from app.indexing_pipeline.document_summarizer import summarize_document +from app.indexing_pipeline.exceptions import ( + EMBEDDING_ERRORS, + FATAL_DB_ERRORS, + PERMANENT_LLM_ERRORS, + RETRYABLE_LLM_ERRORS, + TRANSIENT_DB_ERRORS, + IntegrityError, + PipelineMessages, + embedding_message, + llm_permanent_message, + llm_retryable_message, +) + + +async def _mark_failed(session: AsyncSession, document: Document, message: str) -> None: + """Roll back the current transaction, refresh the document, and persist a failed status.""" + await session.rollback() + await session.refresh(document) + document.updated_at = datetime.now(UTC) + document.status = DocumentStatus.failed(message) + await session.commit() def _safe_set_chunks(document: Document, chunks: list) -> None: @@ -97,6 +117,12 @@ class IndexingPipelineService: self.session.add(document) documents.append(document) + except FATAL_DB_ERRORS: + # Session is broken — abort the entire batch, nothing else is safe to do. + await self.session.rollback() + return [] + except TRANSIENT_DB_ERRORS: + await self.session.rollback() except Exception: continue @@ -150,9 +176,23 @@ class IndexingPipelineService: document.status = DocumentStatus.ready() await self.session.commit() + except RETRYABLE_LLM_ERRORS as e: + await _mark_failed(self.session, document, llm_retryable_message(e)) + + except PERMANENT_LLM_ERRORS as e: + await _mark_failed(self.session, document, llm_permanent_message(e)) + + except EMBEDDING_ERRORS as e: + await _mark_failed(self.session, document, embedding_message(e)) + + except RecursionError: + await _mark_failed(self.session, document, PipelineMessages.CHUNKING_OVERFLOW) + + except FATAL_DB_ERRORS: + raise + + except TRANSIENT_DB_ERRORS: + await _mark_failed(self.session, document, PipelineMessages.DB_TRANSIENT) + except Exception as e: - await self.session.rollback() - await self.session.refresh(document) - document.updated_at = datetime.now(UTC) - document.status = DocumentStatus.failed(str(e)) - await self.session.commit() + await _mark_failed(self.session, document, str(e))