From 4e3a6dff465f979c3705a53942695ad2f899c7ee Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 22 May 2026 13:49:42 +0530 Subject: [PATCH] feat(etl): instrument extraction spans and outcomes --- .../app/etl_pipeline/etl_pipeline_service.py | 176 ++++++++++++------ 1 file changed, 124 insertions(+), 52 deletions(-) diff --git a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py index 87e8138fd..4cdd387b0 100644 --- a/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py +++ b/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py @@ -1,4 +1,7 @@ +import contextlib import logging +import time +from pathlib import PurePosixPath from app.config import config as app_config from app.etl_pipeline.etl_document import EtlRequest, EtlResult @@ -10,6 +13,11 @@ from app.etl_pipeline.file_classifier import FileCategory, classify_file from app.etl_pipeline.parsers.audio import transcribe_audio from app.etl_pipeline.parsers.direct_convert import convert_file_directly from app.etl_pipeline.parsers.plaintext import read_plaintext +from app.observability import metrics as ot_metrics, otel as ot + + +def _file_extension(filename: str) -> str: + return PurePosixPath(filename).suffix.lower() or "none" class EtlPipelineService: @@ -20,49 +28,88 @@ class EtlPipelineService: async def extract(self, request: EtlRequest) -> EtlResult: category = classify_file(request.filename) + start = time.perf_counter() + status = "success" + result: EtlResult | None = None + with ot.etl_extract_span( + content_type=category.value, + file_extension=_file_extension(request.filename), + processing_mode=request.processing_mode.value, + ) as sp: + try: + if category == FileCategory.UNSUPPORTED: + raise EtlUnsupportedFileError( + f"File type not supported for parsing: {request.filename}" + ) - if category == FileCategory.UNSUPPORTED: - raise EtlUnsupportedFileError( - f"File type not supported for parsing: {request.filename}" - ) + if category == FileCategory.PLAINTEXT: + content = read_plaintext(request.file_path) + result = EtlResult( + markdown_content=content, + etl_service="PLAINTEXT", + content_type="plaintext", + ) + return result - if category == FileCategory.PLAINTEXT: - content = read_plaintext(request.file_path) - return EtlResult( - markdown_content=content, - etl_service="PLAINTEXT", - content_type="plaintext", - ) + if category == FileCategory.DIRECT_CONVERT: + content = convert_file_directly(request.file_path, request.filename) + result = EtlResult( + markdown_content=content, + etl_service="DIRECT_CONVERT", + content_type="direct_convert", + ) + return result - if category == FileCategory.DIRECT_CONVERT: - content = convert_file_directly(request.file_path, request.filename) - return EtlResult( - markdown_content=content, - etl_service="DIRECT_CONVERT", - content_type="direct_convert", - ) + if category == FileCategory.AUDIO: + content = await transcribe_audio(request.file_path, request.filename) + result = EtlResult( + markdown_content=content, + etl_service="AUDIO", + content_type="audio", + ) + return result - if category == FileCategory.AUDIO: - content = await transcribe_audio(request.file_path, request.filename) - return EtlResult( - markdown_content=content, - etl_service="AUDIO", - content_type="audio", - ) + if category == FileCategory.IMAGE: + result = await self._extract_image(request) + return result - if category == FileCategory.IMAGE: - return await self._extract_image(request) - - return await self._extract_document(request) + result = await self._extract_document(request) + return result + except Exception: + status = "error" + raise + finally: + with contextlib.suppress(Exception): + if result is not None: + sp.set_attribute("etl.service", result.etl_service) + sp.set_attribute("content.type", result.content_type) + sp.set_attribute("etl.status", status) + ot_metrics.record_etl_extract_duration( + time.perf_counter() - start, + etl_service=result.etl_service if result else None, + content_type=result.content_type if result else category.value, + status=status, + ) + ot_metrics.record_etl_extract_outcome( + etl_service=result.etl_service if result else None, + content_type=result.content_type if result else category.value, + status=status, + ) async def _extract_image(self, request: EtlRequest) -> EtlResult: if self._vision_llm: try: from app.etl_pipeline.parsers.vision_llm import parse_with_vision_llm - content = await parse_with_vision_llm( - request.file_path, request.filename, self._vision_llm - ) + with ot.etl_parse_span( + etl_service="VISION_LLM", + content_type="image", + file_extension=_file_extension(request.filename), + ) as sp: + content = await parse_with_vision_llm( + request.file_path, request.filename, self._vision_llm + ) + sp.set_attribute("etl.status", "success") return EtlResult( markdown_content=content, etl_service="VISION_LLM", @@ -94,7 +141,11 @@ class EtlPipelineService: ) try: - return await self._extract_document(request) + with ot.etl_ocr_span( + etl_service=app_config.ETL_SERVICE, + file_extension=_file_extension(request.filename), + ): + return await self._extract_document(request) except (EtlUnsupportedFileError, EtlServiceUnavailableError): raise EtlUnsupportedFileError( f"Cannot process image {request.filename}: vision LLM " @@ -121,18 +172,27 @@ class EtlPipelineService: f"File type {ext} is not supported by {etl_service}" ) - if etl_service == "DOCLING": - from app.etl_pipeline.parsers.docling import parse_with_docling + with ot.etl_parse_span( + etl_service=etl_service, + content_type="document", + file_extension=ext, + processing_mode=request.processing_mode.value, + ) as sp: + if etl_service == "DOCLING": + from app.etl_pipeline.parsers.docling import parse_with_docling - content = await parse_with_docling(request.file_path, request.filename) - elif etl_service == "UNSTRUCTURED": - from app.etl_pipeline.parsers.unstructured import parse_with_unstructured + content = await parse_with_docling(request.file_path, request.filename) + elif etl_service == "UNSTRUCTURED": + from app.etl_pipeline.parsers.unstructured import ( + parse_with_unstructured, + ) - content = await parse_with_unstructured(request.file_path) - elif etl_service == "LLAMACLOUD": - content = await self._extract_with_llamacloud(request) - else: - raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}") + content = await parse_with_unstructured(request.file_path) + elif etl_service == "LLAMACLOUD": + content = await self._extract_with_llamacloud(request) + else: + raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}") + sp.set_attribute("etl.status", "success") # When the operator opts into vision-LLM at ingest, walk the # original file's embedded images and append a structured @@ -171,9 +231,14 @@ class EtlPipelineService: async def _ocr_image(image_path: str, image_name: str) -> str: try: sub = EtlPipelineService(vision_llm=None) - ocr_result = await sub.extract( - EtlRequest(file_path=image_path, filename=image_name) - ) + with ot.etl_picture_ocr_span( + file_extension=_file_extension(image_name) + ) as sp: + ocr_result = await sub.extract( + EtlRequest(file_path=image_path, filename=image_name) + ) + sp.set_attribute("etl.service", ocr_result.etl_service) + sp.set_attribute("etl.status", "success") except ( EtlUnsupportedFileError, EtlServiceUnavailableError, @@ -186,12 +251,19 @@ class EtlPipelineService: return ocr_result.markdown_content try: - result = await describe_pictures( - request.file_path, - request.filename, - self._vision_llm, - ocr_runner=_ocr_image, - ) + with ot.etl_picture_describe_span() as sp: + result = await describe_pictures( + request.file_path, + request.filename, + self._vision_llm, + ocr_runner=_ocr_image, + ) + sp.set_attribute("image.described.count", len(result.descriptions)) + sp.set_attribute("image.failed.count", result.failed) + sp.set_attribute("image.skipped.too_small", result.skipped_too_small) + sp.set_attribute("image.skipped.too_large", result.skipped_too_large) + sp.set_attribute("image.skipped.duplicate", result.skipped_duplicate) + sp.set_attribute("etl.status", "success") except Exception: # Picture description is additive; never let it fail an # otherwise-successful document extraction.