feat(etl): instrument extraction spans and outcomes

This commit is contained in:
Anish Sarkar 2026-05-22 13:49:42 +05:30
parent 8bca29fe0d
commit 4e3a6dff46

View file

@ -1,4 +1,7 @@
import contextlib
import logging
import time
from pathlib import PurePosixPath
from app.config import config as app_config
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
@ -10,6 +13,11 @@ from app.etl_pipeline.file_classifier import FileCategory, classify_file
from app.etl_pipeline.parsers.audio import transcribe_audio
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
from app.etl_pipeline.parsers.plaintext import read_plaintext
from app.observability import metrics as ot_metrics, otel as ot
def _file_extension(filename: str) -> str:
return PurePosixPath(filename).suffix.lower() or "none"
class EtlPipelineService:
@ -20,49 +28,88 @@ class EtlPipelineService:
async def extract(self, request: EtlRequest) -> EtlResult:
category = classify_file(request.filename)
start = time.perf_counter()
status = "success"
result: EtlResult | None = None
with ot.etl_extract_span(
content_type=category.value,
file_extension=_file_extension(request.filename),
processing_mode=request.processing_mode.value,
) as sp:
try:
if category == FileCategory.UNSUPPORTED:
raise EtlUnsupportedFileError(
f"File type not supported for parsing: {request.filename}"
)
if category == FileCategory.UNSUPPORTED:
raise EtlUnsupportedFileError(
f"File type not supported for parsing: {request.filename}"
)
if category == FileCategory.PLAINTEXT:
content = read_plaintext(request.file_path)
result = EtlResult(
markdown_content=content,
etl_service="PLAINTEXT",
content_type="plaintext",
)
return result
if category == FileCategory.PLAINTEXT:
content = read_plaintext(request.file_path)
return EtlResult(
markdown_content=content,
etl_service="PLAINTEXT",
content_type="plaintext",
)
if category == FileCategory.DIRECT_CONVERT:
content = convert_file_directly(request.file_path, request.filename)
result = EtlResult(
markdown_content=content,
etl_service="DIRECT_CONVERT",
content_type="direct_convert",
)
return result
if category == FileCategory.DIRECT_CONVERT:
content = convert_file_directly(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="DIRECT_CONVERT",
content_type="direct_convert",
)
if category == FileCategory.AUDIO:
content = await transcribe_audio(request.file_path, request.filename)
result = EtlResult(
markdown_content=content,
etl_service="AUDIO",
content_type="audio",
)
return result
if category == FileCategory.AUDIO:
content = await transcribe_audio(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="AUDIO",
content_type="audio",
)
if category == FileCategory.IMAGE:
result = await self._extract_image(request)
return result
if category == FileCategory.IMAGE:
return await self._extract_image(request)
return await self._extract_document(request)
result = await self._extract_document(request)
return result
except Exception:
status = "error"
raise
finally:
with contextlib.suppress(Exception):
if result is not None:
sp.set_attribute("etl.service", result.etl_service)
sp.set_attribute("content.type", result.content_type)
sp.set_attribute("etl.status", status)
ot_metrics.record_etl_extract_duration(
time.perf_counter() - start,
etl_service=result.etl_service if result else None,
content_type=result.content_type if result else category.value,
status=status,
)
ot_metrics.record_etl_extract_outcome(
etl_service=result.etl_service if result else None,
content_type=result.content_type if result else category.value,
status=status,
)
async def _extract_image(self, request: EtlRequest) -> EtlResult:
if self._vision_llm:
try:
from app.etl_pipeline.parsers.vision_llm import parse_with_vision_llm
content = await parse_with_vision_llm(
request.file_path, request.filename, self._vision_llm
)
with ot.etl_parse_span(
etl_service="VISION_LLM",
content_type="image",
file_extension=_file_extension(request.filename),
) as sp:
content = await parse_with_vision_llm(
request.file_path, request.filename, self._vision_llm
)
sp.set_attribute("etl.status", "success")
return EtlResult(
markdown_content=content,
etl_service="VISION_LLM",
@ -94,7 +141,11 @@ class EtlPipelineService:
)
try:
return await self._extract_document(request)
with ot.etl_ocr_span(
etl_service=app_config.ETL_SERVICE,
file_extension=_file_extension(request.filename),
):
return await self._extract_document(request)
except (EtlUnsupportedFileError, EtlServiceUnavailableError):
raise EtlUnsupportedFileError(
f"Cannot process image {request.filename}: vision LLM "
@ -121,18 +172,27 @@ class EtlPipelineService:
f"File type {ext} is not supported by {etl_service}"
)
if etl_service == "DOCLING":
from app.etl_pipeline.parsers.docling import parse_with_docling
with ot.etl_parse_span(
etl_service=etl_service,
content_type="document",
file_extension=ext,
processing_mode=request.processing_mode.value,
) as sp:
if etl_service == "DOCLING":
from app.etl_pipeline.parsers.docling import parse_with_docling
content = await parse_with_docling(request.file_path, request.filename)
elif etl_service == "UNSTRUCTURED":
from app.etl_pipeline.parsers.unstructured import parse_with_unstructured
content = await parse_with_docling(request.file_path, request.filename)
elif etl_service == "UNSTRUCTURED":
from app.etl_pipeline.parsers.unstructured import (
parse_with_unstructured,
)
content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD":
content = await self._extract_with_llamacloud(request)
else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")
content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD":
content = await self._extract_with_llamacloud(request)
else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")
sp.set_attribute("etl.status", "success")
# When the operator opts into vision-LLM at ingest, walk the
# original file's embedded images and append a structured
@ -171,9 +231,14 @@ class EtlPipelineService:
async def _ocr_image(image_path: str, image_name: str) -> str:
try:
sub = EtlPipelineService(vision_llm=None)
ocr_result = await sub.extract(
EtlRequest(file_path=image_path, filename=image_name)
)
with ot.etl_picture_ocr_span(
file_extension=_file_extension(image_name)
) as sp:
ocr_result = await sub.extract(
EtlRequest(file_path=image_path, filename=image_name)
)
sp.set_attribute("etl.service", ocr_result.etl_service)
sp.set_attribute("etl.status", "success")
except (
EtlUnsupportedFileError,
EtlServiceUnavailableError,
@ -186,12 +251,19 @@ class EtlPipelineService:
return ocr_result.markdown_content
try:
result = await describe_pictures(
request.file_path,
request.filename,
self._vision_llm,
ocr_runner=_ocr_image,
)
with ot.etl_picture_describe_span() as sp:
result = await describe_pictures(
request.file_path,
request.filename,
self._vision_llm,
ocr_runner=_ocr_image,
)
sp.set_attribute("image.described.count", len(result.descriptions))
sp.set_attribute("image.failed.count", result.failed)
sp.set_attribute("image.skipped.too_small", result.skipped_too_small)
sp.set_attribute("image.skipped.too_large", result.skipped_too_large)
sp.set_attribute("image.skipped.duplicate", result.skipped_duplicate)
sp.set_attribute("etl.status", "success")
except Exception:
# Picture description is additive; never let it fail an
# otherwise-successful document extraction.