diff --git a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py index 4cdd387b0..496c6d0c3 100644 --- a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py +++ b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py @@ -30,6 +30,7 @@ class EtlPipelineService: category = classify_file(request.filename) start = time.perf_counter() status = "success" + error_category: str | None = None result: EtlResult | None = None with ot.etl_extract_span( content_type=category.value, @@ -75,8 +76,9 @@ class EtlPipelineService: result = await self._extract_document(request) return result - except Exception: + except Exception as exc: status = "error" + error_category = ot_metrics.categorize_exception(exc) raise finally: with contextlib.suppress(Exception): @@ -94,6 +96,7 @@ class EtlPipelineService: etl_service=result.etl_service if result else None, content_type=result.content_type if result else category.value, status=status, + error_category=error_category, ) async def _extract_image(self, request: EtlRequest) -> EtlResult: @@ -134,11 +137,27 @@ class EtlPipelineService: request.filename, exc_info=True, ) + ot.add_event( + "etl.fallback", + { + "fallback.from": "vision_llm", + "fallback.to": "document_parser", + "fallback.reason": ot_metrics.categorize_exception(exc), + }, + ) else: logging.info( "No vision LLM provided, falling back to document parser for %s", request.filename, ) + ot.add_event( + "etl.fallback", + { + "fallback.from": "vision_llm", + "fallback.to": "document_parser", + "fallback.reason": "not_configured", + }, + ) try: with ot.etl_ocr_span( @@ -246,6 +265,13 @@ class EtlPipelineService: # Common case: the configured ETL service can't OCR # this image format (or no service is configured at # all). Don't spam warnings -- just no OCR for it. + ot.add_event( + "etl.ocr.skipped", + { + "skip.reason": "unsupported_format", + "error.category": ot_metrics.categorize_exception(exc), + }, + ) logging.debug("Skipping per-image OCR for %s: %s", image_name, exc) return "" return ocr_result.markdown_content @@ -264,9 +290,17 @@ class EtlPipelineService: sp.set_attribute("image.skipped.too_large", result.skipped_too_large) sp.set_attribute("image.skipped.duplicate", result.skipped_duplicate) sp.set_attribute("etl.status", "success") - except Exception: + except Exception as exc: # Picture description is additive; never let it fail an # otherwise-successful document extraction. + ot.add_event( + "etl.degraded", + { + "degraded.reason": "picture_describe_failed", + "degraded.action": "return_parser_output", + "error.category": ot_metrics.categorize_exception(exc), + }, + ) logging.warning( "Picture description failed for %s, returning parser output unchanged", request.filename, @@ -319,7 +353,15 @@ class EtlPipelineService: return await parse_with_azure_doc_intelligence( request.file_path, processing_mode=mode_value ) - except Exception: + except Exception as exc: + ot.add_event( + "etl.fallback", + { + "fallback.from": "azure_di", + "fallback.to": "llamacloud", + "fallback.reason": ot_metrics.categorize_exception(exc), + }, + ) logging.warning( "Azure Document Intelligence failed for %s, " "falling back to LlamaCloud", diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 2aa92bd9b..282bd6034 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -2,6 +2,7 @@ import asyncio import contextlib import hashlib import logging +import sys import time from collections.abc import Awaitable, Callable from dataclasses import dataclass, field @@ -445,6 +446,7 @@ class IndexingPipelineService: await self._enqueue_ai_sort_if_enabled(document) except RETRYABLE_LLM_ERRORS as e: + ot.record_error(persist_span, e) log_retryable_llm_error(ctx, e) outcome_status = "requeued" await rollback_and_persist_failure( @@ -452,24 +454,28 @@ class IndexingPipelineService: ) except PERMANENT_LLM_ERRORS as e: + ot.record_error(persist_span, e) log_permanent_llm_error(ctx, e) await rollback_and_persist_failure( self.session, document, llm_permanent_message(e) ) except RecursionError as e: + ot.record_error(persist_span, e) log_chunking_overflow(ctx, e) await rollback_and_persist_failure( self.session, document, PipelineMessages.CHUNKING_OVERFLOW ) except EMBEDDING_ERRORS as e: + ot.record_error(persist_span, e) log_embedding_error(ctx, e) await rollback_and_persist_failure( self.session, document, embedding_message(e) ) except Exception as e: + ot.record_error(persist_span, e) log_unexpected_error(ctx, e) await rollback_and_persist_failure( self.session, document, safe_exception_message(e) @@ -488,7 +494,7 @@ class IndexingPipelineService: document_type=document_type, status=outcome_status, ) - persist_span_cm.__exit__(None, None, None) + persist_span_cm.__exit__(*sys.exc_info()) return document async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None: