add structured error handling to indexing pipeline

This commit is contained in:
CREDO23 2026-02-25 15:26:04 +02:00
parent ca870cf660
commit 0aeb888be0
2 changed files with 171 additions and 6 deletions

View file

@ -0,0 +1,125 @@
from litellm.exceptions import (
APIConnectionError,
APIResponseValidationError,
AuthenticationError,
BadGatewayError,
BadRequestError,
InternalServerError,
NotFoundError,
PermissionDeniedError,
RateLimitError,
ServiceUnavailableError,
Timeout,
UnprocessableEntityError,
)
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError, SQLAlchemyError
from sqlalchemy.orm.exc import DetachedInstanceError
# Tuples for use directly in except clauses.
RETRYABLE_LLM_ERRORS = (
RateLimitError,
Timeout,
ServiceUnavailableError,
BadGatewayError,
InternalServerError,
APIConnectionError,
)
PERMANENT_LLM_ERRORS = (
AuthenticationError,
PermissionDeniedError,
NotFoundError,
BadRequestError,
UnprocessableEntityError,
APIResponseValidationError,
)
TRANSIENT_DB_ERRORS = (
OperationalError,
IntegrityError,
)
# Session is broken after these — re-raise instead of attempting further DB calls.
FATAL_DB_ERRORS = (
InvalidRequestError,
DetachedInstanceError,
)
# (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError).
EMBEDDING_ERRORS = (
RuntimeError,
OSError,
MemoryError,
ValueError,
)
class PipelineMessages:
RATE_LIMIT = "LLM rate limit exceeded. Will retry on next sync."
LLM_TIMEOUT = "LLM request timed out. Will retry on next sync."
LLM_UNAVAILABLE = "LLM service temporarily unavailable. Will retry on next sync."
LLM_BAD_GATEWAY = "LLM gateway error. Will retry on next sync."
LLM_SERVER_ERROR = "LLM internal server error. Will retry on next sync."
LLM_CONNECTION = "Could not reach the LLM service. Check network connectivity."
LLM_AUTH = "LLM authentication failed. Check your API key."
LLM_PERMISSION = "LLM request denied. Check your account permissions."
LLM_NOT_FOUND = "LLM model not found. Check your model configuration."
LLM_BAD_REQUEST = "LLM rejected the request. Document content may be invalid."
LLM_UNPROCESSABLE = "Document exceeds the LLM context window even after optimization."
LLM_RESPONSE = "LLM returned an invalid response."
DB_TRANSIENT = "Database error during indexing. Will retry on next sync."
DB_SESSION_DEAD = "Database session is in an unrecoverable state."
EMBEDDING_FAILED = "Embedding failed. Check your embedding model configuration or service."
EMBEDDING_MODEL = "Embedding model files are missing or corrupted."
EMBEDDING_MEMORY = "Not enough memory to embed this document."
EMBEDDING_INPUT = "Document content is invalid for the embedding model."
CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk."
def llm_retryable_message(exc: Exception) -> str:
if isinstance(exc, RateLimitError):
return PipelineMessages.RATE_LIMIT
if isinstance(exc, Timeout):
return PipelineMessages.LLM_TIMEOUT
if isinstance(exc, ServiceUnavailableError):
return PipelineMessages.LLM_UNAVAILABLE
if isinstance(exc, BadGatewayError):
return PipelineMessages.LLM_BAD_GATEWAY
if isinstance(exc, InternalServerError):
return PipelineMessages.LLM_SERVER_ERROR
if isinstance(exc, APIConnectionError):
return PipelineMessages.LLM_CONNECTION
return str(exc)
def llm_permanent_message(exc: Exception) -> str:
if isinstance(exc, AuthenticationError):
return PipelineMessages.LLM_AUTH
if isinstance(exc, PermissionDeniedError):
return PipelineMessages.LLM_PERMISSION
if isinstance(exc, NotFoundError):
return PipelineMessages.LLM_NOT_FOUND
if isinstance(exc, BadRequestError):
return PipelineMessages.LLM_BAD_REQUEST
if isinstance(exc, UnprocessableEntityError):
return PipelineMessages.LLM_UNPROCESSABLE
if isinstance(exc, APIResponseValidationError):
return PipelineMessages.LLM_RESPONSE
return str(exc)
def embedding_message(exc: Exception) -> str:
if isinstance(exc, RuntimeError):
return PipelineMessages.EMBEDDING_FAILED
if isinstance(exc, OSError):
return PipelineMessages.EMBEDDING_MODEL
if isinstance(exc, MemoryError):
return PipelineMessages.EMBEDDING_MEMORY
if isinstance(exc, ValueError):
return PipelineMessages.EMBEDDING_INPUT
return str(exc)

View file

@ -1,7 +1,6 @@
from datetime import UTC, datetime
from sqlalchemy import delete, select
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import object_session
from sqlalchemy.orm.attributes import set_committed_value
@ -12,6 +11,27 @@ from app.indexing_pipeline.document_chunker import chunk_text
from app.indexing_pipeline.document_embedder import embed_text
from app.indexing_pipeline.document_hashing import compute_content_hash, compute_unique_identifier_hash
from app.indexing_pipeline.document_summarizer import summarize_document
from app.indexing_pipeline.exceptions import (
EMBEDDING_ERRORS,
FATAL_DB_ERRORS,
PERMANENT_LLM_ERRORS,
RETRYABLE_LLM_ERRORS,
TRANSIENT_DB_ERRORS,
IntegrityError,
PipelineMessages,
embedding_message,
llm_permanent_message,
llm_retryable_message,
)
async def _mark_failed(session: AsyncSession, document: Document, message: str) -> None:
"""Roll back the current transaction, refresh the document, and persist a failed status."""
await session.rollback()
await session.refresh(document)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.failed(message)
await session.commit()
def _safe_set_chunks(document: Document, chunks: list) -> None:
@ -97,6 +117,12 @@ class IndexingPipelineService:
self.session.add(document)
documents.append(document)
except FATAL_DB_ERRORS:
# Session is broken — abort the entire batch, nothing else is safe to do.
await self.session.rollback()
return []
except TRANSIENT_DB_ERRORS:
await self.session.rollback()
except Exception:
continue
@ -150,9 +176,23 @@ class IndexingPipelineService:
document.status = DocumentStatus.ready()
await self.session.commit()
except RETRYABLE_LLM_ERRORS as e:
await _mark_failed(self.session, document, llm_retryable_message(e))
except PERMANENT_LLM_ERRORS as e:
await _mark_failed(self.session, document, llm_permanent_message(e))
except EMBEDDING_ERRORS as e:
await _mark_failed(self.session, document, embedding_message(e))
except RecursionError:
await _mark_failed(self.session, document, PipelineMessages.CHUNKING_OVERFLOW)
except FATAL_DB_ERRORS:
raise
except TRANSIENT_DB_ERRORS:
await _mark_failed(self.session, document, PipelineMessages.DB_TRANSIENT)
except Exception as e:
await self.session.rollback()
await self.session.refresh(document)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.failed(str(e))
await self.session.commit()
await _mark_failed(self.session, document, str(e))