feat(pipeline): enrich ETL and indexing failure telemetry

This commit is contained in:
Anish Sarkar 2026-05-22 17:49:46 +05:30
parent 6e03ab044a
commit c4abbd6e20
2 changed files with 52 additions and 4 deletions

View file

@ -30,6 +30,7 @@ class EtlPipelineService:
category = classify_file(request.filename) category = classify_file(request.filename)
start = time.perf_counter() start = time.perf_counter()
status = "success" status = "success"
error_category: str | None = None
result: EtlResult | None = None result: EtlResult | None = None
with ot.etl_extract_span( with ot.etl_extract_span(
content_type=category.value, content_type=category.value,
@ -75,8 +76,9 @@ class EtlPipelineService:
result = await self._extract_document(request) result = await self._extract_document(request)
return result return result
except Exception: except Exception as exc:
status = "error" status = "error"
error_category = ot_metrics.categorize_exception(exc)
raise raise
finally: finally:
with contextlib.suppress(Exception): with contextlib.suppress(Exception):
@ -94,6 +96,7 @@ class EtlPipelineService:
etl_service=result.etl_service if result else None, etl_service=result.etl_service if result else None,
content_type=result.content_type if result else category.value, content_type=result.content_type if result else category.value,
status=status, status=status,
error_category=error_category,
) )
async def _extract_image(self, request: EtlRequest) -> EtlResult: async def _extract_image(self, request: EtlRequest) -> EtlResult:
@ -134,11 +137,27 @@ class EtlPipelineService:
request.filename, request.filename,
exc_info=True, exc_info=True,
) )
ot.add_event(
"etl.fallback",
{
"fallback.from": "vision_llm",
"fallback.to": "document_parser",
"fallback.reason": ot_metrics.categorize_exception(exc),
},
)
else: else:
logging.info( logging.info(
"No vision LLM provided, falling back to document parser for %s", "No vision LLM provided, falling back to document parser for %s",
request.filename, request.filename,
) )
ot.add_event(
"etl.fallback",
{
"fallback.from": "vision_llm",
"fallback.to": "document_parser",
"fallback.reason": "not_configured",
},
)
try: try:
with ot.etl_ocr_span( with ot.etl_ocr_span(
@ -246,6 +265,13 @@ class EtlPipelineService:
# Common case: the configured ETL service can't OCR # Common case: the configured ETL service can't OCR
# this image format (or no service is configured at # this image format (or no service is configured at
# all). Don't spam warnings -- just no OCR for it. # all). Don't spam warnings -- just no OCR for it.
ot.add_event(
"etl.ocr.skipped",
{
"skip.reason": "unsupported_format",
"error.category": ot_metrics.categorize_exception(exc),
},
)
logging.debug("Skipping per-image OCR for %s: %s", image_name, exc) logging.debug("Skipping per-image OCR for %s: %s", image_name, exc)
return "" return ""
return ocr_result.markdown_content return ocr_result.markdown_content
@ -264,9 +290,17 @@ class EtlPipelineService:
sp.set_attribute("image.skipped.too_large", result.skipped_too_large) sp.set_attribute("image.skipped.too_large", result.skipped_too_large)
sp.set_attribute("image.skipped.duplicate", result.skipped_duplicate) sp.set_attribute("image.skipped.duplicate", result.skipped_duplicate)
sp.set_attribute("etl.status", "success") sp.set_attribute("etl.status", "success")
except Exception: except Exception as exc:
# Picture description is additive; never let it fail an # Picture description is additive; never let it fail an
# otherwise-successful document extraction. # otherwise-successful document extraction.
ot.add_event(
"etl.degraded",
{
"degraded.reason": "picture_describe_failed",
"degraded.action": "return_parser_output",
"error.category": ot_metrics.categorize_exception(exc),
},
)
logging.warning( logging.warning(
"Picture description failed for %s, returning parser output unchanged", "Picture description failed for %s, returning parser output unchanged",
request.filename, request.filename,
@ -319,7 +353,15 @@ class EtlPipelineService:
return await parse_with_azure_doc_intelligence( return await parse_with_azure_doc_intelligence(
request.file_path, processing_mode=mode_value request.file_path, processing_mode=mode_value
) )
except Exception: except Exception as exc:
ot.add_event(
"etl.fallback",
{
"fallback.from": "azure_di",
"fallback.to": "llamacloud",
"fallback.reason": ot_metrics.categorize_exception(exc),
},
)
logging.warning( logging.warning(
"Azure Document Intelligence failed for %s, " "Azure Document Intelligence failed for %s, "
"falling back to LlamaCloud", "falling back to LlamaCloud",

View file

@ -2,6 +2,7 @@ import asyncio
import contextlib import contextlib
import hashlib import hashlib
import logging import logging
import sys
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field from dataclasses import dataclass, field
@ -445,6 +446,7 @@ class IndexingPipelineService:
await self._enqueue_ai_sort_if_enabled(document) await self._enqueue_ai_sort_if_enabled(document)
except RETRYABLE_LLM_ERRORS as e: except RETRYABLE_LLM_ERRORS as e:
ot.record_error(persist_span, e)
log_retryable_llm_error(ctx, e) log_retryable_llm_error(ctx, e)
outcome_status = "requeued" outcome_status = "requeued"
await rollback_and_persist_failure( await rollback_and_persist_failure(
@ -452,24 +454,28 @@ class IndexingPipelineService:
) )
except PERMANENT_LLM_ERRORS as e: except PERMANENT_LLM_ERRORS as e:
ot.record_error(persist_span, e)
log_permanent_llm_error(ctx, e) log_permanent_llm_error(ctx, e)
await rollback_and_persist_failure( await rollback_and_persist_failure(
self.session, document, llm_permanent_message(e) self.session, document, llm_permanent_message(e)
) )
except RecursionError as e: except RecursionError as e:
ot.record_error(persist_span, e)
log_chunking_overflow(ctx, e) log_chunking_overflow(ctx, e)
await rollback_and_persist_failure( await rollback_and_persist_failure(
self.session, document, PipelineMessages.CHUNKING_OVERFLOW self.session, document, PipelineMessages.CHUNKING_OVERFLOW
) )
except EMBEDDING_ERRORS as e: except EMBEDDING_ERRORS as e:
ot.record_error(persist_span, e)
log_embedding_error(ctx, e) log_embedding_error(ctx, e)
await rollback_and_persist_failure( await rollback_and_persist_failure(
self.session, document, embedding_message(e) self.session, document, embedding_message(e)
) )
except Exception as e: except Exception as e:
ot.record_error(persist_span, e)
log_unexpected_error(ctx, e) log_unexpected_error(ctx, e)
await rollback_and_persist_failure( await rollback_and_persist_failure(
self.session, document, safe_exception_message(e) self.session, document, safe_exception_message(e)
@ -488,7 +494,7 @@ class IndexingPipelineService:
document_type=document_type, document_type=document_type,
status=outcome_status, status=outcome_status,
) )
persist_span_cm.__exit__(None, None, None) persist_span_cm.__exit__(*sys.exc_info())
return document return document
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: