feat: implement ETL pipeline with file classification and extraction services

This commit is contained in:
Anish Sarkar 2026-04-05 17:25:25 +05:30
parent 9c0af6569d
commit 5d22349dc1
6 changed files with 188 additions and 0 deletions

View file

@ -0,0 +1,39 @@
import ssl
import httpx
LLAMACLOUD_MAX_RETRIES = 5
LLAMACLOUD_BASE_DELAY = 10
LLAMACLOUD_MAX_DELAY = 120
LLAMACLOUD_RETRYABLE_EXCEPTIONS = (
ssl.SSLError,
httpx.ConnectError,
httpx.ConnectTimeout,
httpx.ReadError,
httpx.ReadTimeout,
httpx.WriteError,
httpx.WriteTimeout,
httpx.RemoteProtocolError,
httpx.LocalProtocolError,
ConnectionError,
ConnectionResetError,
TimeoutError,
OSError,
)
UPLOAD_BYTES_PER_SECOND_SLOW = 100 * 1024
MIN_UPLOAD_TIMEOUT = 120
MAX_UPLOAD_TIMEOUT = 1800
BASE_JOB_TIMEOUT = 600
PER_PAGE_JOB_TIMEOUT = 60
def calculate_upload_timeout(file_size_bytes: int) -> float:
estimated_time = (file_size_bytes / UPLOAD_BYTES_PER_SECOND_SLOW) * 1.5
return max(MIN_UPLOAD_TIMEOUT, min(estimated_time, MAX_UPLOAD_TIMEOUT))
def calculate_job_timeout(estimated_pages: int, file_size_bytes: int) -> float:
page_based_timeout = BASE_JOB_TIMEOUT + (estimated_pages * PER_PAGE_JOB_TIMEOUT)
size_based_timeout = BASE_JOB_TIMEOUT + (file_size_bytes / (10 * 1024 * 1024)) * 60
return max(page_based_timeout, size_based_timeout)

View file

@ -0,0 +1,21 @@
from pydantic import BaseModel, field_validator
class EtlRequest(BaseModel):
file_path: str
filename: str
estimated_pages: int = 0
@field_validator("filename")
@classmethod
def filename_must_not_be_empty(cls, v: str) -> str:
if not v.strip():
raise ValueError("filename must not be empty")
return v
class EtlResult(BaseModel):
markdown_content: str
etl_service: str
actual_pages: int = 0
content_type: str

View file

@ -0,0 +1,73 @@
from app.config import config as app_config
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.exceptions import EtlServiceUnavailableError
from app.etl_pipeline.file_classifier import FileCategory, classify_file
from app.etl_pipeline.parsers.audio import transcribe_audio
from app.etl_pipeline.parsers.direct_convert import convert_file_directly
from app.etl_pipeline.parsers.plaintext import read_plaintext
class EtlPipelineService:
"""Single pipeline for extracting markdown from files. All callers use this."""
async def extract(self, request: EtlRequest) -> EtlResult:
category = classify_file(request.filename)
if category == FileCategory.PLAINTEXT:
content = read_plaintext(request.file_path)
return EtlResult(
markdown_content=content,
etl_service="PLAINTEXT",
content_type="plaintext",
)
if category == FileCategory.DIRECT_CONVERT:
content = convert_file_directly(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="DIRECT_CONVERT",
content_type="direct_convert",
)
if category == FileCategory.AUDIO:
content = await transcribe_audio(request.file_path, request.filename)
return EtlResult(
markdown_content=content,
etl_service="AUDIO",
content_type="audio",
)
return await self._extract_document(request)
async def _extract_document(self, request: EtlRequest) -> EtlResult:
etl_service = app_config.ETL_SERVICE
if not etl_service:
raise EtlServiceUnavailableError(
"No ETL_SERVICE configured. "
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
if etl_service == "DOCLING":
from app.etl_pipeline.parsers.docling import parse_with_docling
content = await parse_with_docling(request.file_path, request.filename)
elif etl_service == "UNSTRUCTURED":
from app.etl_pipeline.parsers.unstructured import parse_with_unstructured
content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD":
from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud
content = await parse_with_llamacloud(
request.file_path, request.estimated_pages
)
else:
raise EtlServiceUnavailableError(
f"Unknown ETL_SERVICE: {etl_service}"
)
return EtlResult(
markdown_content=content,
etl_service=etl_service,
content_type="document",
)

View file

@ -0,0 +1,6 @@
class EtlParseError(Exception):
"""Raised when an ETL parser fails to produce content."""
class EtlServiceUnavailableError(Exception):
"""Raised when the configured ETL_SERVICE is not recognised."""

View file

@ -0,0 +1,49 @@
from enum import Enum
from pathlib import PurePosixPath
PLAINTEXT_EXTENSIONS = frozenset(
{
".md", ".markdown", ".txt", ".text",
".json", ".jsonl", ".yaml", ".yml", ".toml", ".ini", ".cfg", ".conf", ".xml",
".css", ".scss", ".less", ".sass",
".py", ".pyw", ".pyi", ".pyx",
".js", ".jsx", ".ts", ".tsx", ".mjs", ".cjs",
".java", ".kt", ".kts", ".scala", ".groovy",
".c", ".h", ".cpp", ".cxx", ".cc", ".hpp", ".hxx",
".cs", ".fs", ".fsx",
".go", ".rs", ".rb", ".php", ".pl", ".pm", ".lua", ".swift",
".m", ".mm", ".r", ".jl",
".sh", ".bash", ".zsh", ".fish", ".bat", ".cmd", ".ps1",
".sql", ".graphql", ".gql",
".env", ".gitignore", ".dockerignore", ".editorconfig",
".makefile", ".cmake",
".log", ".rst", ".tex", ".bib", ".org", ".adoc", ".asciidoc",
".vue", ".svelte", ".astro",
".tf", ".hcl", ".proto",
}
)
AUDIO_EXTENSIONS = frozenset(
{".mp3", ".mp4", ".mpeg", ".mpga", ".m4a", ".wav", ".webm"}
)
DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"})
class FileCategory(Enum):
PLAINTEXT = "plaintext"
AUDIO = "audio"
DIRECT_CONVERT = "direct_convert"
DOCUMENT = "document"
def classify_file(filename: str) -> FileCategory:
suffix = PurePosixPath(filename).suffix.lower()
if suffix in PLAINTEXT_EXTENSIONS:
return FileCategory.PLAINTEXT
if suffix in AUDIO_EXTENSIONS:
return FileCategory.AUDIO
if suffix in DIRECT_CONVERT_EXTENSIONS:
return FileCategory.DIRECT_CONVERT
return FileCategory.DOCUMENT