simplify indexing pipeline DB error handling

This commit is contained in:
CREDO23 2026-02-25 16:59:09 +02:00
parent 66d7d3da8a
commit 5be58b78ad
3 changed files with 1 additions and 51 deletions

View file

@ -12,8 +12,7 @@ from litellm.exceptions import (
Timeout, Timeout,
UnprocessableEntityError, UnprocessableEntityError,
) )
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm.exc import DetachedInstanceError
# Tuples for use directly in except clauses. # Tuples for use directly in except clauses.
RETRYABLE_LLM_ERRORS = ( RETRYABLE_LLM_ERRORS = (
@ -34,18 +33,6 @@ PERMANENT_LLM_ERRORS = (
APIResponseValidationError, APIResponseValidationError,
) )
TRANSIENT_DB_ERRORS = (
OperationalError,
IntegrityError,
)
# Session is broken after these — re-raise instead of attempting further DB calls.
FATAL_DB_ERRORS = (
InvalidRequestError,
DetachedInstanceError,
)
# (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError). # (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError).
EMBEDDING_ERRORS = ( EMBEDDING_ERRORS = (
RuntimeError, # local device failure or API backend normalization RuntimeError, # local device failure or API backend normalization

View file

@ -12,10 +12,8 @@ from app.indexing_pipeline.document_persistence import attach_chunks_to_document
from app.indexing_pipeline.document_summarizer import summarize_document from app.indexing_pipeline.document_summarizer import summarize_document
from app.indexing_pipeline.exceptions import ( from app.indexing_pipeline.exceptions import (
EMBEDDING_ERRORS, EMBEDDING_ERRORS,
FATAL_DB_ERRORS,
PERMANENT_LLM_ERRORS, PERMANENT_LLM_ERRORS,
RETRYABLE_LLM_ERRORS, RETRYABLE_LLM_ERRORS,
TRANSIENT_DB_ERRORS,
IntegrityError, IntegrityError,
PipelineMessages, PipelineMessages,
embedding_message, embedding_message,
@ -25,11 +23,7 @@ from app.indexing_pipeline.exceptions import (
) )
from app.indexing_pipeline.pipeline_logger import ( from app.indexing_pipeline.pipeline_logger import (
PipelineLogContext, PipelineLogContext,
log_batch_aborted,
log_chunking_overflow, log_chunking_overflow,
log_db_fatal_error,
log_db_transient_error,
log_doc_skipped_db,
log_doc_skipped_unknown, log_doc_skipped_unknown,
log_document_queued, log_document_queued,
log_document_requeued, log_document_requeued,
@ -129,15 +123,8 @@ class IndexingPipelineService:
documents.append(document) documents.append(document)
log_document_queued(ctx) log_document_queued(ctx)
except FATAL_DB_ERRORS as e:
log_batch_aborted(ctx, e)
await self.session.rollback()
return []
except TRANSIENT_DB_ERRORS as e:
log_doc_skipped_db(ctx, e)
except Exception as e: except Exception as e:
log_doc_skipped_unknown(ctx, e) log_doc_skipped_unknown(ctx, e)
continue
try: try:
await self.session.commit() await self.session.commit()
@ -214,14 +201,6 @@ class IndexingPipelineService:
log_embedding_error(ctx, e) log_embedding_error(ctx, e)
await rollback_and_persist_failure(self.session, document, embedding_message(e)) await rollback_and_persist_failure(self.session, document, embedding_message(e))
except FATAL_DB_ERRORS as e:
log_db_fatal_error(ctx, e)
raise
except TRANSIENT_DB_ERRORS as e:
log_db_transient_error(ctx, e)
await rollback_and_persist_failure(self.session, document, PipelineMessages.DB_TRANSIENT)
except Exception as e: except Exception as e:
log_unexpected_error(ctx, e) log_unexpected_error(ctx, e)
await rollback_and_persist_failure(self.session, document, safe_exception_message(e)) await rollback_and_persist_failure(self.session, document, safe_exception_message(e))

View file

@ -79,18 +79,10 @@ def log_document_requeued(ctx: PipelineLogContext) -> None:
_safe_log(logger.info, LogMessages.DOCUMENT_REQUEUED, ctx) _safe_log(logger.info, LogMessages.DOCUMENT_REQUEUED, ctx)
def log_doc_skipped_db(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DOC_SKIPPED_DB, ctx, error=exc)
def log_doc_skipped_unknown(ctx: PipelineLogContext, exc: Exception) -> None: def log_doc_skipped_unknown(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DOC_SKIPPED_UNKNOWN, ctx, exc_info=exc, error=exc) _safe_log(logger.warning, LogMessages.DOC_SKIPPED_UNKNOWN, ctx, exc_info=exc, error=exc)
def log_batch_aborted(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.critical, LogMessages.BATCH_ABORTED, ctx, exc_info=exc, error=exc)
def log_race_condition(ctx: PipelineLogContext) -> None: def log_race_condition(ctx: PipelineLogContext) -> None:
_safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx) _safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx)
@ -121,13 +113,5 @@ def log_chunking_overflow(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.CHUNKING_OVERFLOW, ctx, exc_info=exc, error=exc) _safe_log(logger.error, LogMessages.CHUNKING_OVERFLOW, ctx, exc_info=exc, error=exc)
def log_db_transient_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DB_TRANSIENT, ctx, error=exc)
def log_db_fatal_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.critical, LogMessages.DB_FATAL, ctx, exc_info=exc, error=exc)
def log_unexpected_error(ctx: PipelineLogContext, exc: Exception) -> None: def log_unexpected_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.UNEXPECTED, ctx, exc_info=exc, error=exc) _safe_log(logger.error, LogMessages.UNEXPECTED, ctx, exc_info=exc, error=exc)