SurfSense/surfsense_backend/app/etl_pipeline/etl_pipeline_service.py

165 lines
6.3 KiB
Python

import logging
from app.config import config as app_config
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.exceptions import (
EtlServiceUnavailableError,
EtlUnsupportedFileError,
)
from app.etl_pipeline.file_classifier import FileCategory, classify_file
from app.etl_pipeline.parsers.audio import transcribe_audio
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
from app.etl_pipeline.parsers.plaintext import read_plaintext
class EtlPipelineService:
"""Single pipeline for extracting markdown from files. All callers use this."""
def __init__(self, *, vision_llm=None):
self._vision_llm = vision_llm
async def extract(self, request: EtlRequest) -> EtlResult:
category = classify_file(request.filename)
if category == FileCategory.UNSUPPORTED:
raise EtlUnsupportedFileError(
f"File type not supported for parsing: {request.filename}"
)
if category == FileCategory.PLAINTEXT:
content = read_plaintext(request.file_path)
return EtlResult(
markdown_content=content,
etl_service="PLAINTEXT",
content_type="plaintext",
)
if category == FileCategory.DIRECT_CONVERT:
content = convert_file_directly(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="DIRECT_CONVERT",
content_type="direct_convert",
)
if category == FileCategory.AUDIO:
content = await transcribe_audio(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="AUDIO",
content_type="audio",
)
if category == FileCategory.IMAGE:
return await self._extract_image(request)
return await self._extract_document(request)
async def _extract_image(self, request: EtlRequest) -> EtlResult:
if self._vision_llm:
try:
from app.etl_pipeline.parsers.vision_llm import parse_with_vision_llm
content = await parse_with_vision_llm(
request.file_path, request.filename, self._vision_llm
)
return EtlResult(
markdown_content=content,
etl_service="VISION_LLM",
content_type="image",
)
except Exception:
logging.warning(
"Vision LLM failed for %s, falling back to document parser",
request.filename,
exc_info=True,
)
else:
logging.info(
"No vision LLM provided, falling back to document parser for %s",
request.filename,
)
try:
return await self._extract_document(request)
except (EtlUnsupportedFileError, EtlServiceUnavailableError):
raise EtlUnsupportedFileError(
f"Cannot process image {request.filename}: vision LLM "
f"{'failed' if self._vision_llm else 'not configured'} and "
f"document parser does not support this format"
) from None
async def _extract_document(self, request: EtlRequest) -> EtlResult:
from pathlib import PurePosixPath
from app.utils.file_extensions import get_document_extensions_for_service
etl_service = app_config.ETL_SERVICE
if not etl_service:
raise EtlServiceUnavailableError(
"No ETL_SERVICE configured. "
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
ext = PurePosixPath(request.filename).suffix.lower()
supported = get_document_extensions_for_service(etl_service)
if ext not in supported:
raise EtlUnsupportedFileError(
f"File type {ext} is not supported by {etl_service}"
)
if etl_service == "DOCLING":
from app.etl_pipeline.parsers.docling import parse_with_docling
content = await parse_with_docling(request.file_path, request.filename)
elif etl_service == "UNSTRUCTURED":
from app.etl_pipeline.parsers.unstructured import parse_with_unstructured
content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD":
content = await self._extract_with_llamacloud(request)
else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")
return EtlResult(
markdown_content=content,
etl_service=etl_service,
content_type="document",
)
async def _extract_with_llamacloud(self, request: EtlRequest) -> str:
"""Try Azure Document Intelligence first (when configured) then LlamaCloud.
Azure DI is an internal accelerator: cheaper and faster for its supported
file types. If it is not configured, or the file extension is not in
Azure DI's supported set, LlamaCloud is used directly. If Azure DI
fails for any reason, LlamaCloud is used as a fallback.
"""
from pathlib import PurePosixPath
from app.utils.file_extensions import AZURE_DI_DOCUMENT_EXTENSIONS
ext = PurePosixPath(request.filename).suffix.lower()
azure_configured = bool(
getattr(app_config, "AZURE_DI_ENDPOINT", None)
and getattr(app_config, "AZURE_DI_KEY", None)
)
if azure_configured and ext in AZURE_DI_DOCUMENT_EXTENSIONS:
try:
from app.etl_pipeline.parsers.azure_doc_intelligence import (
parse_with_azure_doc_intelligence,
)
return await parse_with_azure_doc_intelligence(request.file_path)
except Exception:
logging.warning(
"Azure Document Intelligence failed for %s, "
"falling back to LlamaCloud",
request.filename,
exc_info=True,
)
from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud
return await parse_with_llamacloud(request.file_path, request.estimated_pages)