fix: tighten indexing pipeline exception handling and logging

This commit is contained in:
CREDO23 2026-02-25 17:44:35 +02:00
parent 5be58b78ad
commit 86ecb82c6e
3 changed files with 11 additions and 12 deletions

View file

@ -38,7 +38,6 @@ EMBEDDING_ERRORS = (
RuntimeError, # local device failure or API backend normalization
OSError, # model files missing or corrupted (local backends)
MemoryError, # document too large for available RAM
ValueError, # invalid input to encode()
)
@ -57,13 +56,9 @@ class PipelineMessages:
LLM_UNPROCESSABLE = "Document exceeds the LLM context window even after optimization."
LLM_RESPONSE = "LLM returned an invalid response."
DB_TRANSIENT = "Database error during indexing. Will retry on next sync."
DB_SESSION_DEAD = "Database session is in an unrecoverable state."
EMBEDDING_FAILED = "Embedding failed. Check your embedding model configuration or service."
EMBEDDING_MODEL = "Embedding model files are missing or corrupted."
EMBEDDING_MEMORY = "Not enough memory to embed this document."
EMBEDDING_INPUT = "Document content is invalid for the embedding model."
CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk."
@ -121,8 +116,6 @@ def embedding_message(exc: Exception) -> str:
return PipelineMessages.EMBEDDING_MODEL
if isinstance(exc, MemoryError):
return PipelineMessages.EMBEDDING_MEMORY
if isinstance(exc, ValueError):
return PipelineMessages.EMBEDDING_INPUT
return safe_exception_message(exc)
except Exception:
return "Something went wrong when generating the embedding."

View file

@ -23,6 +23,7 @@ from app.indexing_pipeline.exceptions import (
)
from app.indexing_pipeline.pipeline_logger import (
PipelineLogContext,
log_batch_aborted,
log_chunking_overflow,
log_doc_skipped_unknown,
log_document_queued,
@ -129,13 +130,17 @@ class IndexingPipelineService:
try:
await self.session.commit()
return documents
except IntegrityError as e:
except IntegrityError:
# A concurrent worker committed a document with the same content_hash
# or unique_identifier_hash between our check and our INSERT.
# The document already exists — roll back and let the next sync run handle it.
log_race_condition(batch_ctx)
await self.session.rollback()
return []
except Exception as e:
log_batch_aborted(batch_ctx, e)
await self.session.rollback()
return []
async def index(
self, document: Document, connector_doc: ConnectorDocument, llm

View file

@ -17,7 +17,6 @@ class LogMessages:
DOCUMENT_QUEUED = "New document queued for indexing."
DOCUMENT_UPDATED = "Document content changed, re-queued for indexing."
DOCUMENT_REQUEUED = "Stuck document re-queued for indexing."
DOC_SKIPPED_DB = "Transient DB error — document skipped, will retry on next sync."
DOC_SKIPPED_UNKNOWN = "Unexpected error — document skipped."
BATCH_ABORTED = "Fatal DB error — aborting prepare batch."
RACE_CONDITION = "Concurrent worker beat us to the commit — rolling back batch."
@ -29,8 +28,6 @@ class LogMessages:
LLM_PERMANENT = "Permanent LLM error — document marked failed."
EMBEDDING_FAILED = "Embedding error — document marked failed."
CHUNKING_OVERFLOW = "Chunking overflow — document marked failed."
DB_TRANSIENT = "Transient DB error — document marked failed."
DB_FATAL = "Fatal DB error — session is dead, re-raising."
UNEXPECTED = "Unexpected error — document marked failed."
@ -87,6 +84,10 @@ def log_race_condition(ctx: PipelineLogContext) -> None:
_safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx)
def log_batch_aborted(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.BATCH_ABORTED, ctx, exc_info=exc, error=exc)
# ── index ─────────────────────────────────────────────────────────────────────
def log_index_started(ctx: PipelineLogContext) -> None:
@ -98,7 +99,7 @@ def log_index_success(ctx: PipelineLogContext, chunk_count: int) -> None:
def log_retryable_llm_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, error=exc)
_safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, exc_info=exc, error=exc)
def log_permanent_llm_error(ctx: PipelineLogContext, exc: Exception) -> None: