add structured logging to indexing pipeline

This commit is contained in:
CREDO23 2026-02-25 16:04:35 +02:00
parent 610080bfef
commit b6c25628c8
3 changed files with 249 additions and 49 deletions

View file

@ -12,7 +12,7 @@ from litellm.exceptions import (
Timeout, Timeout,
UnprocessableEntityError, UnprocessableEntityError,
) )
from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError, SQLAlchemyError from sqlalchemy.exc import IntegrityError, InvalidRequestError, OperationalError
from sqlalchemy.orm.exc import DetachedInstanceError from sqlalchemy.orm.exc import DetachedInstanceError
# Tuples for use directly in except clauses. # Tuples for use directly in except clauses.
@ -48,10 +48,10 @@ FATAL_DB_ERRORS = (
# (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError). # (LiteLLMEmbeddings, CohereEmbeddings, GeminiEmbeddings all normalize to RuntimeError).
EMBEDDING_ERRORS = ( EMBEDDING_ERRORS = (
RuntimeError, RuntimeError, # local device failure or API backend normalization
OSError, OSError, # model files missing or corrupted (local backends)
MemoryError, MemoryError, # document too large for available RAM
ValueError, ValueError, # invalid input to encode()
) )
@ -81,45 +81,61 @@ class PipelineMessages:
CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk." CHUNKING_OVERFLOW = "Document structure is too deeply nested to chunk."
def safe_exception_message(exc: Exception) -> str:
try:
return str(exc)
except Exception:
return "Something went wrong during indexing. Error details could not be retrieved."
def llm_retryable_message(exc: Exception) -> str: def llm_retryable_message(exc: Exception) -> str:
if isinstance(exc, RateLimitError): try:
return PipelineMessages.RATE_LIMIT if isinstance(exc, RateLimitError):
if isinstance(exc, Timeout): return PipelineMessages.RATE_LIMIT
return PipelineMessages.LLM_TIMEOUT if isinstance(exc, Timeout):
if isinstance(exc, ServiceUnavailableError): return PipelineMessages.LLM_TIMEOUT
return PipelineMessages.LLM_UNAVAILABLE if isinstance(exc, ServiceUnavailableError):
if isinstance(exc, BadGatewayError): return PipelineMessages.LLM_UNAVAILABLE
return PipelineMessages.LLM_BAD_GATEWAY if isinstance(exc, BadGatewayError):
if isinstance(exc, InternalServerError): return PipelineMessages.LLM_BAD_GATEWAY
return PipelineMessages.LLM_SERVER_ERROR if isinstance(exc, InternalServerError):
if isinstance(exc, APIConnectionError): return PipelineMessages.LLM_SERVER_ERROR
return PipelineMessages.LLM_CONNECTION if isinstance(exc, APIConnectionError):
return str(exc) return PipelineMessages.LLM_CONNECTION
return safe_exception_message(exc)
except Exception:
return "Something went wrong when calling the LLM."
def llm_permanent_message(exc: Exception) -> str: def llm_permanent_message(exc: Exception) -> str:
if isinstance(exc, AuthenticationError): try:
return PipelineMessages.LLM_AUTH if isinstance(exc, AuthenticationError):
if isinstance(exc, PermissionDeniedError): return PipelineMessages.LLM_AUTH
return PipelineMessages.LLM_PERMISSION if isinstance(exc, PermissionDeniedError):
if isinstance(exc, NotFoundError): return PipelineMessages.LLM_PERMISSION
return PipelineMessages.LLM_NOT_FOUND if isinstance(exc, NotFoundError):
if isinstance(exc, BadRequestError): return PipelineMessages.LLM_NOT_FOUND
return PipelineMessages.LLM_BAD_REQUEST if isinstance(exc, BadRequestError):
if isinstance(exc, UnprocessableEntityError): return PipelineMessages.LLM_BAD_REQUEST
return PipelineMessages.LLM_UNPROCESSABLE if isinstance(exc, UnprocessableEntityError):
if isinstance(exc, APIResponseValidationError): return PipelineMessages.LLM_UNPROCESSABLE
return PipelineMessages.LLM_RESPONSE if isinstance(exc, APIResponseValidationError):
return str(exc) return PipelineMessages.LLM_RESPONSE
return safe_exception_message(exc)
except Exception:
return "Something went wrong when calling the LLM."
def embedding_message(exc: Exception) -> str: def embedding_message(exc: Exception) -> str:
if isinstance(exc, RuntimeError): try:
return PipelineMessages.EMBEDDING_FAILED if isinstance(exc, RuntimeError):
if isinstance(exc, OSError): return PipelineMessages.EMBEDDING_FAILED
return PipelineMessages.EMBEDDING_MODEL if isinstance(exc, OSError):
if isinstance(exc, MemoryError): return PipelineMessages.EMBEDDING_MODEL
return PipelineMessages.EMBEDDING_MEMORY if isinstance(exc, MemoryError):
if isinstance(exc, ValueError): return PipelineMessages.EMBEDDING_MEMORY
return PipelineMessages.EMBEDDING_INPUT if isinstance(exc, ValueError):
return str(exc) return PipelineMessages.EMBEDDING_INPUT
return safe_exception_message(exc)
except Exception:
return "Something went wrong when generating the embedding."

View file

@ -21,6 +21,26 @@ from app.indexing_pipeline.exceptions import (
embedding_message, embedding_message,
llm_permanent_message, llm_permanent_message,
llm_retryable_message, llm_retryable_message,
safe_exception_message,
)
from app.indexing_pipeline.pipeline_logger import (
PipelineLogContext,
log_batch_aborted,
log_chunking_overflow,
log_db_fatal_error,
log_db_transient_error,
log_doc_skipped_db,
log_doc_skipped_unknown,
log_document_queued,
log_document_requeued,
log_document_updated,
log_embedding_error,
log_index_started,
log_index_success,
log_permanent_llm_error,
log_race_condition,
log_retryable_llm_error,
log_unexpected_error,
) )
@ -38,8 +58,18 @@ class IndexingPipelineService:
""" """
documents = [] documents = []
seen_hashes: set[str] = set() seen_hashes: set[str] = set()
batch_ctx = PipelineLogContext(
connector_id=connector_docs[0].connector_id if connector_docs else 0,
search_space_id=connector_docs[0].search_space_id if connector_docs else 0,
unique_id="batch",
)
for connector_doc in connector_docs: for connector_doc in connector_docs:
ctx = PipelineLogContext(
connector_id=connector_doc.connector_id,
search_space_id=connector_doc.search_space_id,
unique_id=connector_doc.unique_id,
)
try: try:
unique_identifier_hash = compute_unique_identifier_hash(connector_doc) unique_identifier_hash = compute_unique_identifier_hash(connector_doc)
content_hash = compute_content_hash(connector_doc) content_hash = compute_content_hash(connector_doc)
@ -62,6 +92,7 @@ class IndexingPipelineService:
existing.status = DocumentStatus.pending() existing.status = DocumentStatus.pending()
existing.updated_at = datetime.now(UTC) existing.updated_at = datetime.now(UTC)
documents.append(existing) documents.append(existing)
log_document_requeued(ctx)
continue continue
existing.title = connector_doc.title existing.title = connector_doc.title
@ -71,6 +102,7 @@ class IndexingPipelineService:
existing.updated_at = datetime.now(UTC) existing.updated_at = datetime.now(UTC)
existing.status = DocumentStatus.pending() existing.status = DocumentStatus.pending()
documents.append(existing) documents.append(existing)
log_document_updated(ctx)
continue continue
duplicate = await self.session.execute( duplicate = await self.session.execute(
@ -95,23 +127,27 @@ class IndexingPipelineService:
) )
self.session.add(document) self.session.add(document)
documents.append(document) documents.append(document)
log_document_queued(ctx)
except FATAL_DB_ERRORS: except FATAL_DB_ERRORS as e:
# Session is broken — abort the entire batch, nothing else is safe to do. log_batch_aborted(ctx, e)
await self.session.rollback() await self.session.rollback()
return [] return []
except TRANSIENT_DB_ERRORS: except TRANSIENT_DB_ERRORS as e:
log_doc_skipped_db(ctx, e)
await self.session.rollback() await self.session.rollback()
except Exception: except Exception as e:
log_doc_skipped_unknown(ctx, e)
continue continue
try: try:
await self.session.commit() await self.session.commit()
return documents return documents
except IntegrityError: except IntegrityError as e:
# A concurrent worker committed a document with the same content_hash # A concurrent worker committed a document with the same content_hash
# or unique_identifier_hash between our check and our INSERT. # or unique_identifier_hash between our check and our INSERT.
# The document already exists — roll back and let the next sync run handle it. # The document already exists — roll back and let the next sync run handle it.
log_race_condition(batch_ctx)
await self.session.rollback() await self.session.rollback()
return [] return []
@ -121,7 +157,14 @@ class IndexingPipelineService:
""" """
Run summarization, embedding, and chunking for a document and persist the results. Run summarization, embedding, and chunking for a document and persist the results.
""" """
ctx = PipelineLogContext(
connector_id=connector_doc.connector_id,
search_space_id=connector_doc.search_space_id,
unique_id=connector_doc.unique_id,
doc_id=document.id,
)
try: try:
log_index_started(ctx)
document.status = DocumentStatus.processing() document.status = DocumentStatus.processing()
await self.session.commit() await self.session.commit()
@ -154,24 +197,32 @@ class IndexingPipelineService:
document.updated_at = datetime.now(UTC) document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.ready() document.status = DocumentStatus.ready()
await self.session.commit() await self.session.commit()
log_index_success(ctx, chunk_count=len(chunks))
except RETRYABLE_LLM_ERRORS as e: except RETRYABLE_LLM_ERRORS as e:
log_retryable_llm_error(ctx, e)
await rollback_and_persist_failure(self.session, document, llm_retryable_message(e)) await rollback_and_persist_failure(self.session, document, llm_retryable_message(e))
except PERMANENT_LLM_ERRORS as e: except PERMANENT_LLM_ERRORS as e:
log_permanent_llm_error(ctx, e)
await rollback_and_persist_failure(self.session, document, llm_permanent_message(e)) await rollback_and_persist_failure(self.session, document, llm_permanent_message(e))
except EMBEDDING_ERRORS as e: except EMBEDDING_ERRORS as e:
log_embedding_error(ctx, e)
await rollback_and_persist_failure(self.session, document, embedding_message(e)) await rollback_and_persist_failure(self.session, document, embedding_message(e))
except RecursionError: except RecursionError as e:
log_chunking_overflow(ctx, e)
await rollback_and_persist_failure(self.session, document, PipelineMessages.CHUNKING_OVERFLOW) await rollback_and_persist_failure(self.session, document, PipelineMessages.CHUNKING_OVERFLOW)
except FATAL_DB_ERRORS: except FATAL_DB_ERRORS as e:
log_db_fatal_error(ctx, e)
raise raise
except TRANSIENT_DB_ERRORS: except TRANSIENT_DB_ERRORS as e:
log_db_transient_error(ctx, e)
await rollback_and_persist_failure(self.session, document, PipelineMessages.DB_TRANSIENT) await rollback_and_persist_failure(self.session, document, PipelineMessages.DB_TRANSIENT)
except Exception as e: except Exception as e:
await rollback_and_persist_failure(self.session, document, str(e)) log_unexpected_error(ctx, e)
await rollback_and_persist_failure(self.session, document, safe_exception_message(e))

View file

@ -0,0 +1,133 @@
import logging
from dataclasses import dataclass
logger = logging.getLogger(__name__)
@dataclass
class PipelineLogContext:
connector_id: int
search_space_id: int
unique_id: str # always available from ConnectorDocument
doc_id: int | None = None # set once the DB row exists (index phase only)
class LogMessages:
# prepare_for_indexing
DOCUMENT_QUEUED = "New document queued for indexing."
DOCUMENT_UPDATED = "Document content changed, re-queued for indexing."
DOCUMENT_REQUEUED = "Stuck document re-queued for indexing."
DOC_SKIPPED_DB = "Transient DB error — document skipped, will retry on next sync."
DOC_SKIPPED_UNKNOWN = "Unexpected error — document skipped."
BATCH_ABORTED = "Fatal DB error — aborting prepare batch."
RACE_CONDITION = "Concurrent worker beat us to the commit — rolling back batch."
# index
INDEX_STARTED = "Document indexing started."
INDEX_SUCCESS = "Document indexed successfully."
LLM_RETRYABLE = "Retryable LLM error — document marked failed, will retry on next sync."
LLM_PERMANENT = "Permanent LLM error — document marked failed."
EMBEDDING_FAILED = "Embedding error — document marked failed."
CHUNKING_OVERFLOW = "Chunking overflow — document marked failed."
DB_TRANSIENT = "Transient DB error — document marked failed."
DB_FATAL = "Fatal DB error — session is dead, re-raising."
UNEXPECTED = "Unexpected error — document marked failed."
def _format_context(ctx: PipelineLogContext) -> str:
parts = [
f"connector_id={ctx.connector_id}",
f"search_space_id={ctx.search_space_id}",
f"unique_id={ctx.unique_id}",
]
if ctx.doc_id is not None:
parts.append(f"doc_id={ctx.doc_id}")
return " ".join(parts)
def _build_message(msg: str, ctx: PipelineLogContext, **extra) -> str:
try:
parts = [msg, _format_context(ctx)]
for key, val in extra.items():
parts.append(f"{key}={val}")
return " ".join(parts)
except Exception:
return msg
def _safe_log(level_fn, msg: str, ctx: PipelineLogContext, exc_info=None, **extra) -> None:
# Logging must never raise — a broken log call inside an except block would
# chain with the original exception and mask it entirely.
try:
message = _build_message(msg, ctx, **extra)
level_fn(message, exc_info=exc_info)
except Exception:
pass
# ── prepare_for_indexing ──────────────────────────────────────────────────────
def log_document_queued(ctx: PipelineLogContext) -> None:
_safe_log(logger.info, LogMessages.DOCUMENT_QUEUED, ctx)
def log_document_updated(ctx: PipelineLogContext) -> None:
_safe_log(logger.info, LogMessages.DOCUMENT_UPDATED, ctx)
def log_document_requeued(ctx: PipelineLogContext) -> None:
_safe_log(logger.info, LogMessages.DOCUMENT_REQUEUED, ctx)
def log_doc_skipped_db(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DOC_SKIPPED_DB, ctx, error=exc)
def log_doc_skipped_unknown(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DOC_SKIPPED_UNKNOWN, ctx, exc_info=exc, error=exc)
def log_batch_aborted(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.critical, LogMessages.BATCH_ABORTED, ctx, error=exc)
def log_race_condition(ctx: PipelineLogContext) -> None:
_safe_log(logger.warning, LogMessages.RACE_CONDITION, ctx)
# ── index ─────────────────────────────────────────────────────────────────────
def log_index_started(ctx: PipelineLogContext) -> None:
_safe_log(logger.info, LogMessages.INDEX_STARTED, ctx)
def log_index_success(ctx: PipelineLogContext, chunk_count: int) -> None:
_safe_log(logger.info, LogMessages.INDEX_SUCCESS, ctx, chunk_count=chunk_count)
def log_retryable_llm_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.LLM_RETRYABLE, ctx, error=exc)
def log_permanent_llm_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.LLM_PERMANENT, ctx, error=exc)
def log_embedding_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.EMBEDDING_FAILED, ctx, error=exc)
def log_chunking_overflow(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.CHUNKING_OVERFLOW, ctx, error=exc)
def log_db_transient_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.warning, LogMessages.DB_TRANSIENT, ctx, error=exc)
def log_db_fatal_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.critical, LogMessages.DB_FATAL, ctx, error=exc)
def log_unexpected_error(ctx: PipelineLogContext, exc: Exception) -> None:
_safe_log(logger.error, LogMessages.UNEXPECTED, ctx, exc_info=exc, error=exc)