feat: add support for Azure Document Intelligence in ETL pipeline

This commit is contained in:
Anish Sarkar 2026-04-08 00:59:12 +05:30
parent 73a9c5fbd1
commit 1fa8d1220b
10 changed files with 248 additions and 5 deletions

View file

@ -20,7 +20,7 @@ AUTH_TYPE=LOCAL
# Allow new user registrations (TRUE or FALSE)
# REGISTRATION_ENABLED=TRUE
# Document parsing service: DOCLING, UNSTRUCTURED, or LLAMACLOUD
# Document parsing service: DOCLING, UNSTRUCTURED, LLAMACLOUD, or AZURE_DI
ETL_SERVICE=DOCLING
# Embedding model for vector search
@ -283,6 +283,10 @@ STT_SERVICE=local/base
# LlamaCloud (if ETL_SERVICE=LLAMACLOUD)
# LLAMA_CLOUD_API_KEY=
# Azure Document Intelligence (if ETL_SERVICE=AZURE_DI)
# AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/
# AZURE_DI_KEY=
# ------------------------------------------------------------------------------
# Observability (optional)
# ------------------------------------------------------------------------------

View file

@ -190,9 +190,11 @@ PAGES_LIMIT=500
FIRECRAWL_API_KEY=fcr-01J0000000000000000000000
# File Parser Service
ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING
ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING or AZURE_DI
UNSTRUCTURED_API_KEY=Tpu3P0U8iy
LLAMA_CLOUD_API_KEY=llx-nnn
AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/
AZURE_DI_KEY=your-key
# OPTIONAL: Add these for LangSmith Observability
LANGSMITH_TRACING=true

View file

@ -397,6 +397,10 @@ class Config:
# LlamaCloud API Key
LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY")
elif ETL_SERVICE == "AZURE_DI":
AZURE_DI_ENDPOINT = os.getenv("AZURE_DI_ENDPOINT")
AZURE_DI_KEY = os.getenv("AZURE_DI_KEY")
# Residential Proxy Configuration (anonymous-proxies.net)
# Used for web crawling and YouTube transcript fetching to avoid IP bans.
RESIDENTIAL_PROXY_USERNAME = os.getenv("RESIDENTIAL_PROXY_USERNAME")

View file

@ -56,7 +56,7 @@ class EtlPipelineService:
if not etl_service:
raise EtlServiceUnavailableError(
"No ETL_SERVICE configured. "
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, DOCLING, or AZURE_DI in your .env"
)
ext = PurePosixPath(request.filename).suffix.lower()
@ -80,6 +80,12 @@ class EtlPipelineService:
content = await parse_with_llamacloud(
request.file_path, request.estimated_pages
)
elif etl_service == "AZURE_DI":
from app.etl_pipeline.parsers.azure_doc_intelligence import (
parse_with_azure_doc_intelligence,
)
content = await parse_with_azure_doc_intelligence(request.file_path)
else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")

View file

@ -0,0 +1,93 @@
import asyncio
import logging
import os
import random
from app.config import config as app_config
MAX_RETRIES = 5
BASE_DELAY = 10
MAX_DELAY = 120
async def parse_with_azure_doc_intelligence(file_path: str) -> str:
from azure.ai.documentintelligence.aio import DocumentIntelligenceClient
from azure.ai.documentintelligence.models import DocumentContentFormat
from azure.core.credentials import AzureKeyCredential
from azure.core.exceptions import (
ClientAuthenticationError,
HttpResponseError,
ServiceRequestError,
ServiceResponseError,
)
file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
retryable_exceptions = (ServiceRequestError, ServiceResponseError)
last_exception = None
attempt_errors: list[str] = []
for attempt in range(1, MAX_RETRIES + 1):
try:
client = DocumentIntelligenceClient(
endpoint=app_config.AZURE_DI_ENDPOINT,
credential=AzureKeyCredential(app_config.AZURE_DI_KEY),
)
async with client:
with open(file_path, "rb") as f:
poller = await client.begin_analyze_document(
"prebuilt-read",
body=f,
output_content_format=DocumentContentFormat.MARKDOWN,
)
result = await poller.result()
if attempt > 1:
logging.info(
f"Azure Document Intelligence succeeded on attempt {attempt} "
f"after {len(attempt_errors)} failures"
)
if not result.content:
return ""
return result.content
except ClientAuthenticationError:
raise
except HttpResponseError as e:
if e.status_code and 400 <= e.status_code < 500:
raise
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
except retryable_exceptions as e:
last_exception = e
error_type = type(e).__name__
error_msg = str(e)[:200]
attempt_errors.append(f"Attempt {attempt}: {error_type} - {error_msg}")
if attempt < MAX_RETRIES:
base_delay = min(BASE_DELAY * (2 ** (attempt - 1)), MAX_DELAY)
jitter = base_delay * 0.25 * (2 * random.random() - 1)
delay = base_delay + jitter
logging.warning(
f"Azure Document Intelligence failed "
f"(attempt {attempt}/{MAX_RETRIES}): "
f"{attempt_errors[-1]}. File: {file_size_mb:.1f}MB. "
f"Retrying in {delay:.0f}s..."
)
await asyncio.sleep(delay)
else:
logging.error(
f"Azure Document Intelligence failed after {MAX_RETRIES} "
f"attempts. File size: {file_size_mb:.1f}MB. "
f"Errors: {'; '.join(attempt_errors)}"
)
raise last_exception or RuntimeError(
f"Azure Document Intelligence parsing failed after {MAX_RETRIES} retries. "
f"File size: {file_size_mb:.1f}MB"
)

View file

@ -93,6 +93,22 @@ UNSTRUCTURED_DOCUMENT_EXTENSIONS: frozenset[str] = frozenset(
}
)
AZURE_DI_DOCUMENT_EXTENSIONS: frozenset[str] = frozenset(
{
".pdf",
".docx",
".xlsx",
".pptx",
".png",
".jpg",
".jpeg",
".bmp",
".tiff",
".tif",
".heif",
}
)
# ---------------------------------------------------------------------------
# Union (used by classify_file for routing) + service lookup
# ---------------------------------------------------------------------------
@ -101,12 +117,14 @@ DOCUMENT_EXTENSIONS: frozenset[str] = (
DOCLING_DOCUMENT_EXTENSIONS
| LLAMAPARSE_DOCUMENT_EXTENSIONS
| UNSTRUCTURED_DOCUMENT_EXTENSIONS
| AZURE_DI_DOCUMENT_EXTENSIONS
)
_SERVICE_MAP: dict[str, frozenset[str]] = {
"DOCLING": DOCLING_DOCUMENT_EXTENSIONS,
"LLAMACLOUD": LLAMAPARSE_DOCUMENT_EXTENSIONS,
"UNSTRUCTURED": UNSTRUCTURED_DOCUMENT_EXTENSIONS,
"AZURE_DI": AZURE_DI_DOCUMENT_EXTENSIONS,
}

View file

@ -249,6 +249,110 @@ async def test_extract_pdf_with_llamacloud(tmp_path, mocker):
assert result.content_type == "document"
# ---------------------------------------------------------------------------
# Slice 9b - AZURE_DI document parsing
# ---------------------------------------------------------------------------
async def test_extract_pdf_with_azure_di(tmp_path, mocker):
"""A .pdf file with ETL_SERVICE=AZURE_DI returns parsed markdown."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI")
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
class FakeResult:
content = "# Azure DI parsed"
fake_poller = mocker.AsyncMock()
fake_poller.result.return_value = FakeResult()
fake_client = mocker.AsyncMock()
fake_client.begin_analyze_document.return_value = fake_poller
fake_client.__aenter__ = mocker.AsyncMock(return_value=fake_client)
fake_client.__aexit__ = mocker.AsyncMock(return_value=False)
mocker.patch(
"azure.ai.documentintelligence.aio.DocumentIntelligenceClient",
return_value=fake_client,
)
mocker.patch(
"azure.ai.documentintelligence.models.DocumentContentFormat",
mocker.MagicMock(MARKDOWN="markdown"),
)
mocker.patch(
"azure.core.credentials.AzureKeyCredential",
return_value=mocker.MagicMock(),
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf")
)
assert result.markdown_content == "# Azure DI parsed"
assert result.etl_service == "AZURE_DI"
assert result.content_type == "document"
async def test_extract_docx_with_azure_di(tmp_path, mocker):
"""A .docx file with ETL_SERVICE=AZURE_DI routes correctly."""
docx_file = tmp_path / "doc.docx"
docx_file.write_bytes(b"PK fake docx")
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI")
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
class FakeResult:
content = "Docx content from Azure"
fake_poller = mocker.AsyncMock()
fake_poller.result.return_value = FakeResult()
fake_client = mocker.AsyncMock()
fake_client.begin_analyze_document.return_value = fake_poller
fake_client.__aenter__ = mocker.AsyncMock(return_value=fake_client)
fake_client.__aexit__ = mocker.AsyncMock(return_value=False)
mocker.patch(
"azure.ai.documentintelligence.aio.DocumentIntelligenceClient",
return_value=fake_client,
)
mocker.patch(
"azure.ai.documentintelligence.models.DocumentContentFormat",
mocker.MagicMock(MARKDOWN="markdown"),
)
mocker.patch(
"azure.core.credentials.AzureKeyCredential",
return_value=mocker.MagicMock(),
)
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(docx_file), filename="doc.docx")
)
assert result.markdown_content == "Docx content from Azure"
assert result.etl_service == "AZURE_DI"
assert result.content_type == "document"
async def test_extract_unsupported_ext_with_azure_di_raises(tmp_path, mocker):
"""AZURE_DI rejects extensions it doesn't support (e.g. .epub)."""
from app.etl_pipeline.exceptions import EtlUnsupportedFileError
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI")
epub_file = tmp_path / "book.epub"
epub_file.write_bytes(b"\x00" * 10)
with pytest.raises(EtlUnsupportedFileError, match="not supported by AZURE_DI"):
await EtlPipelineService().extract(
EtlRequest(file_path=str(epub_file), filename="book.epub")
)
# ---------------------------------------------------------------------------
# Slice 10 - unknown extension falls through to document ETL
# ---------------------------------------------------------------------------
@ -416,6 +520,13 @@ async def test_extract_zip_raises_unsupported_error(tmp_path):
("file.svg", "DOCLING", True),
("file.p7s", "UNSTRUCTURED", False),
("file.p7s", "LLAMACLOUD", True),
("file.pdf", "AZURE_DI", False),
("file.docx", "AZURE_DI", False),
("file.heif", "AZURE_DI", False),
("file.epub", "AZURE_DI", True),
("file.doc", "AZURE_DI", True),
("file.rtf", "AZURE_DI", True),
("file.svg", "AZURE_DI", True),
],
)
def test_should_skip_for_service(filename, etl_service, expected_skip):

View file

@ -1,6 +1,6 @@
NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000
NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE
NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING
NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING or AZURE_DI
NEXT_PUBLIC_ZERO_CACHE_URL=http://localhost:4848
# Contact Form Vars (optional)

View file

@ -96,6 +96,11 @@ const FILE_TYPE_CONFIG: Record<string, Record<string, string[]>> = {
"image/tiff": [".tiff", ".tif"],
...audioFileTypes,
},
AZURE_DI: {
...commonTypes,
"image/heic": [".heic"],
...audioFileTypes,
},
default: {
...commonTypes,
"application/msword": [".doc"],

View file

@ -19,7 +19,7 @@ export const AUTH_TYPE = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE || "G
// Placeholder: __NEXT_PUBLIC_FASTAPI_BACKEND_URL__
export const BACKEND_URL = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000";
// ETL Service: "DOCLING" or "UNSTRUCTURED"
// ETL Service: "DOCLING", "UNSTRUCTURED", "LLAMACLOUD", or "AZURE_DI"
// Placeholder: __NEXT_PUBLIC_ETL_SERVICE__
export const ETL_SERVICE = process.env.NEXT_PUBLIC_ETL_SERVICE || "DOCLING";