refactor: make Azure Document Intelligence an internal LLAMACLOUD accelerator instead of a standalone ETL service

This commit is contained in:
Anish Sarkar 2026-04-08 03:26:24 +05:30
parent 1fa8d1220b
commit 20fa93f0ba
9 changed files with 200 additions and 85 deletions

View file

@ -20,7 +20,7 @@ AUTH_TYPE=LOCAL
# Allow new user registrations (TRUE or FALSE) # Allow new user registrations (TRUE or FALSE)
# REGISTRATION_ENABLED=TRUE # REGISTRATION_ENABLED=TRUE
# Document parsing service: DOCLING, UNSTRUCTURED, LLAMACLOUD, or AZURE_DI # Document parsing service: DOCLING, UNSTRUCTURED, or LLAMACLOUD
ETL_SERVICE=DOCLING ETL_SERVICE=DOCLING
# Embedding model for vector search # Embedding model for vector search
@ -282,8 +282,7 @@ STT_SERVICE=local/base
# LlamaCloud (if ETL_SERVICE=LLAMACLOUD) # LlamaCloud (if ETL_SERVICE=LLAMACLOUD)
# LLAMA_CLOUD_API_KEY= # LLAMA_CLOUD_API_KEY=
# Optional: Azure Document Intelligence accelerator (used with LLAMACLOUD)
# Azure Document Intelligence (if ETL_SERVICE=AZURE_DI)
# AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ # AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/
# AZURE_DI_KEY= # AZURE_DI_KEY=

View file

@ -190,11 +190,12 @@ PAGES_LIMIT=500
FIRECRAWL_API_KEY=fcr-01J0000000000000000000000 FIRECRAWL_API_KEY=fcr-01J0000000000000000000000
# File Parser Service # File Parser Service
ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING or AZURE_DI ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING
UNSTRUCTURED_API_KEY=Tpu3P0U8iy UNSTRUCTURED_API_KEY=Tpu3P0U8iy
LLAMA_CLOUD_API_KEY=llx-nnn LLAMA_CLOUD_API_KEY=llx-nnn
AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/ # Optional: Azure Document Intelligence accelerator (used when ETL_SERVICE=LLAMACLOUD)
AZURE_DI_KEY=your-key # AZURE_DI_ENDPOINT=https://your-resource.cognitiveservices.azure.com/
# AZURE_DI_KEY=your-key
# OPTIONAL: Add these for LangSmith Observability # OPTIONAL: Add these for LangSmith Observability
LANGSMITH_TRACING=true LANGSMITH_TRACING=true

View file

@ -394,10 +394,8 @@ class Config:
UNSTRUCTURED_API_KEY = os.getenv("UNSTRUCTURED_API_KEY") UNSTRUCTURED_API_KEY = os.getenv("UNSTRUCTURED_API_KEY")
elif ETL_SERVICE == "LLAMACLOUD": elif ETL_SERVICE == "LLAMACLOUD":
# LlamaCloud API Key
LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY") LLAMA_CLOUD_API_KEY = os.getenv("LLAMA_CLOUD_API_KEY")
# Optional: Azure Document Intelligence accelerator for supported file types
elif ETL_SERVICE == "AZURE_DI":
AZURE_DI_ENDPOINT = os.getenv("AZURE_DI_ENDPOINT") AZURE_DI_ENDPOINT = os.getenv("AZURE_DI_ENDPOINT")
AZURE_DI_KEY = os.getenv("AZURE_DI_KEY") AZURE_DI_KEY = os.getenv("AZURE_DI_KEY")

View file

@ -1,3 +1,5 @@
import logging
from app.config import config as app_config from app.config import config as app_config
from app.etl_pipeline.etl_document import EtlRequest, EtlResult from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.exceptions import ( from app.etl_pipeline.exceptions import (
@ -56,7 +58,7 @@ class EtlPipelineService:
if not etl_service: if not etl_service:
raise EtlServiceUnavailableError( raise EtlServiceUnavailableError(
"No ETL_SERVICE configured. " "No ETL_SERVICE configured. "
"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, DOCLING, or AZURE_DI in your .env" "Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
) )
ext = PurePosixPath(request.filename).suffix.lower() ext = PurePosixPath(request.filename).suffix.lower()
@ -75,17 +77,7 @@ class EtlPipelineService:
content = await parse_with_unstructured(request.file_path) content = await parse_with_unstructured(request.file_path)
elif etl_service == "LLAMACLOUD": elif etl_service == "LLAMACLOUD":
from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud content = await self._extract_with_llamacloud(request)
content = await parse_with_llamacloud(
request.file_path, request.estimated_pages
)
elif etl_service == "AZURE_DI":
from app.etl_pipeline.parsers.azure_doc_intelligence import (
parse_with_azure_doc_intelligence,
)
content = await parse_with_azure_doc_intelligence(request.file_path)
else: else:
raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}") raise EtlServiceUnavailableError(f"Unknown ETL_SERVICE: {etl_service}")
@ -94,3 +86,42 @@ class EtlPipelineService:
etl_service=etl_service, etl_service=etl_service,
content_type="document", content_type="document",
) )
async def _extract_with_llamacloud(self, request: EtlRequest) -> str:
"""Try Azure Document Intelligence first (when configured) then LlamaCloud.
Azure DI is an internal accelerator: cheaper and faster for its supported
file types. If it is not configured, or the file extension is not in
Azure DI's supported set, LlamaCloud is used directly. If Azure DI
fails for any reason, LlamaCloud is used as a fallback.
"""
from pathlib import PurePosixPath
from app.utils.file_extensions import AZURE_DI_DOCUMENT_EXTENSIONS
ext = PurePosixPath(request.filename).suffix.lower()
azure_configured = bool(
getattr(app_config, "AZURE_DI_ENDPOINT", None)
and getattr(app_config, "AZURE_DI_KEY", None)
)
if azure_configured and ext in AZURE_DI_DOCUMENT_EXTENSIONS:
try:
from app.etl_pipeline.parsers.azure_doc_intelligence import (
parse_with_azure_doc_intelligence,
)
return await parse_with_azure_doc_intelligence(request.file_path)
except Exception:
logging.warning(
"Azure Document Intelligence failed for %s, "
"falling back to LlamaCloud",
request.filename,
exc_info=True,
)
from app.etl_pipeline.parsers.llamacloud import parse_with_llamacloud
return await parse_with_llamacloud(
request.file_path, request.estimated_pages
)

View file

@ -124,16 +124,27 @@ _SERVICE_MAP: dict[str, frozenset[str]] = {
"DOCLING": DOCLING_DOCUMENT_EXTENSIONS, "DOCLING": DOCLING_DOCUMENT_EXTENSIONS,
"LLAMACLOUD": LLAMAPARSE_DOCUMENT_EXTENSIONS, "LLAMACLOUD": LLAMAPARSE_DOCUMENT_EXTENSIONS,
"UNSTRUCTURED": UNSTRUCTURED_DOCUMENT_EXTENSIONS, "UNSTRUCTURED": UNSTRUCTURED_DOCUMENT_EXTENSIONS,
"AZURE_DI": AZURE_DI_DOCUMENT_EXTENSIONS,
} }
def get_document_extensions_for_service(etl_service: str | None) -> frozenset[str]: def get_document_extensions_for_service(etl_service: str | None) -> frozenset[str]:
"""Return the document extensions supported by *etl_service*. """Return the document extensions supported by *etl_service*.
When *etl_service* is ``LLAMACLOUD`` and Azure Document Intelligence
credentials are configured, the set is dynamically expanded to include
Azure DI's supported extensions (e.g. ``.heif``).
Falls back to the full union when the service is ``None`` or unknown. Falls back to the full union when the service is ``None`` or unknown.
""" """
return _SERVICE_MAP.get(etl_service or "", DOCUMENT_EXTENSIONS) extensions = _SERVICE_MAP.get(etl_service or "", DOCUMENT_EXTENSIONS)
if etl_service == "LLAMACLOUD":
from app.config import config as app_config
if getattr(app_config, "AZURE_DI_ENDPOINT", None) and getattr(
app_config, "AZURE_DI_KEY", None
):
extensions = extensions | AZURE_DI_DOCUMENT_EXTENSIONS
return extensions
def is_supported_document_extension(filename: str) -> bool: def is_supported_document_extension(filename: str) -> bool:

View file

@ -250,21 +250,17 @@ async def test_extract_pdf_with_llamacloud(tmp_path, mocker):
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Slice 9b - AZURE_DI document parsing # Slice 9b - LLAMACLOUD + Azure DI accelerator
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
async def test_extract_pdf_with_azure_di(tmp_path, mocker): def _mock_azure_di(mocker, content="# Azure DI parsed"):
"""A .pdf file with ETL_SERVICE=AZURE_DI returns parsed markdown.""" """Wire up Azure DI mocks and return the fake client for assertions."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI")
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
class FakeResult: class FakeResult:
content = "# Azure DI parsed" pass
FakeResult.content = content
fake_poller = mocker.AsyncMock() fake_poller = mocker.AsyncMock()
fake_poller.result.return_value = FakeResult() fake_poller.result.return_value = FakeResult()
@ -286,72 +282,160 @@ async def test_extract_pdf_with_azure_di(tmp_path, mocker):
"azure.core.credentials.AzureKeyCredential", "azure.core.credentials.AzureKeyCredential",
return_value=mocker.MagicMock(), return_value=mocker.MagicMock(),
) )
return fake_client
def _mock_llamacloud(mocker, content="# LlamaCloud parsed"):
"""Wire up LlamaCloud mocks and return the fake parser for assertions."""
class FakeDoc:
pass
FakeDoc.text = content
class FakeJobResult:
pages = []
def get_markdown_documents(self, split_by_page=True):
return [FakeDoc()]
fake_parser = mocker.AsyncMock()
fake_parser.aparse.return_value = FakeJobResult()
mocker.patch(
"llama_cloud_services.LlamaParse",
return_value=fake_parser,
)
mocker.patch(
"llama_cloud_services.parse.utils.ResultType",
mocker.MagicMock(MD="md"),
)
return fake_parser
async def test_llamacloud_with_azure_di_uses_azure_for_pdf(tmp_path, mocker):
"""When Azure DI is configured, a supported extension (.pdf) is parsed by Azure DI."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
fake_client = _mock_azure_di(mocker, "# Azure DI parsed")
fake_parser = _mock_llamacloud(mocker)
result = await EtlPipelineService().extract( result = await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf") EtlRequest(file_path=str(pdf_file), filename="report.pdf")
) )
assert result.markdown_content == "# Azure DI parsed" assert result.markdown_content == "# Azure DI parsed"
assert result.etl_service == "AZURE_DI" assert result.etl_service == "LLAMACLOUD"
assert result.content_type == "document" assert result.content_type == "document"
fake_client.begin_analyze_document.assert_called_once()
fake_parser.aparse.assert_not_called()
async def test_extract_docx_with_azure_di(tmp_path, mocker): async def test_llamacloud_azure_di_fallback_on_failure(tmp_path, mocker):
"""A .docx file with ETL_SERVICE=AZURE_DI routes correctly.""" """When Azure DI fails, LlamaCloud is used as a fallback."""
docx_file = tmp_path / "doc.docx" pdf_file = tmp_path / "report.pdf"
docx_file.write_bytes(b"PK fake docx") pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI") mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True) mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True) mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
class FakeResult:
content = "Docx content from Azure"
fake_poller = mocker.AsyncMock()
fake_poller.result.return_value = FakeResult()
fake_client = mocker.AsyncMock()
fake_client.begin_analyze_document.return_value = fake_poller
fake_client.__aenter__ = mocker.AsyncMock(return_value=fake_client)
fake_client.__aexit__ = mocker.AsyncMock(return_value=False)
mocker.patch( mocker.patch(
"azure.ai.documentintelligence.aio.DocumentIntelligenceClient", "app.etl_pipeline.parsers.azure_doc_intelligence.parse_with_azure_doc_intelligence",
return_value=fake_client, side_effect=RuntimeError("Azure DI unavailable"),
)
mocker.patch(
"azure.ai.documentintelligence.models.DocumentContentFormat",
mocker.MagicMock(MARKDOWN="markdown"),
)
mocker.patch(
"azure.core.credentials.AzureKeyCredential",
return_value=mocker.MagicMock(),
) )
fake_parser = _mock_llamacloud(mocker, "# LlamaCloud fallback")
result = await EtlPipelineService().extract( result = await EtlPipelineService().extract(
EtlRequest(file_path=str(docx_file), filename="doc.docx") EtlRequest(file_path=str(pdf_file), filename="report.pdf", estimated_pages=5)
) )
assert result.markdown_content == "Docx content from Azure" assert result.markdown_content == "# LlamaCloud fallback"
assert result.etl_service == "AZURE_DI" assert result.etl_service == "LLAMACLOUD"
assert result.content_type == "document" assert result.content_type == "document"
fake_parser.aparse.assert_called_once()
async def test_extract_unsupported_ext_with_azure_di_raises(tmp_path, mocker): async def test_llamacloud_skips_azure_di_for_unsupported_ext(tmp_path, mocker):
"""AZURE_DI rejects extensions it doesn't support (e.g. .epub).""" """Azure DI is skipped for extensions it doesn't support (e.g. .epub)."""
from app.etl_pipeline.exceptions import EtlUnsupportedFileError
mocker.patch("app.config.config.ETL_SERVICE", "AZURE_DI")
epub_file = tmp_path / "book.epub" epub_file = tmp_path / "book.epub"
epub_file.write_bytes(b"\x00" * 10) epub_file.write_bytes(b"\x00" * 10)
with pytest.raises(EtlUnsupportedFileError, match="not supported by AZURE_DI"): mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/", create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key", create=True)
fake_client = _mock_azure_di(mocker)
fake_parser = _mock_llamacloud(mocker, "# Epub from LlamaCloud")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(epub_file), filename="book.epub", estimated_pages=50)
)
assert result.markdown_content == "# Epub from LlamaCloud"
assert result.etl_service == "LLAMACLOUD"
fake_client.begin_analyze_document.assert_not_called()
fake_parser.aparse.assert_called_once()
async def test_llamacloud_without_azure_di_uses_llamacloud_directly(tmp_path, mocker):
"""When Azure DI is not configured, LlamaCloud handles all file types directly."""
pdf_file = tmp_path / "report.pdf"
pdf_file.write_bytes(b"%PDF-1.4 fake content " * 10)
mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", None, create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", None, create=True)
fake_parser = _mock_llamacloud(mocker, "# Direct LlamaCloud")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(pdf_file), filename="report.pdf", estimated_pages=5)
)
assert result.markdown_content == "# Direct LlamaCloud"
assert result.etl_service == "LLAMACLOUD"
assert result.content_type == "document"
fake_parser.aparse.assert_called_once()
async def test_llamacloud_heif_accepted_only_with_azure_di(tmp_path, mocker):
""".heif is accepted by LLAMACLOUD only when Azure DI credentials are set."""
from app.etl_pipeline.exceptions import EtlUnsupportedFileError
heif_file = tmp_path / "photo.heif"
heif_file.write_bytes(b"\x00" * 100)
mocker.patch("app.config.config.ETL_SERVICE", "LLAMACLOUD")
mocker.patch("app.config.config.LLAMA_CLOUD_API_KEY", "fake-key", create=True)
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", None, create=True)
mocker.patch("app.config.config.AZURE_DI_KEY", None, create=True)
with pytest.raises(EtlUnsupportedFileError, match="not supported by LLAMACLOUD"):
await EtlPipelineService().extract( await EtlPipelineService().extract(
EtlRequest(file_path=str(epub_file), filename="book.epub") EtlRequest(file_path=str(heif_file), filename="photo.heif")
) )
mocker.patch("app.config.config.AZURE_DI_ENDPOINT", "https://fake.cognitiveservices.azure.com/")
mocker.patch("app.config.config.AZURE_DI_KEY", "fake-key")
fake_client = _mock_azure_di(mocker, "# HEIF from Azure DI")
result = await EtlPipelineService().extract(
EtlRequest(file_path=str(heif_file), filename="photo.heif")
)
assert result.markdown_content == "# HEIF from Azure DI"
assert result.etl_service == "LLAMACLOUD"
fake_client.begin_analyze_document.assert_called_once()
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Slice 10 - unknown extension falls through to document ETL # Slice 10 - unknown extension falls through to document ETL
@ -520,13 +604,9 @@ async def test_extract_zip_raises_unsupported_error(tmp_path):
("file.svg", "DOCLING", True), ("file.svg", "DOCLING", True),
("file.p7s", "UNSTRUCTURED", False), ("file.p7s", "UNSTRUCTURED", False),
("file.p7s", "LLAMACLOUD", True), ("file.p7s", "LLAMACLOUD", True),
("file.pdf", "AZURE_DI", False), ("file.heif", "LLAMACLOUD", True),
("file.docx", "AZURE_DI", False), ("file.heif", "DOCLING", True),
("file.heif", "AZURE_DI", False), ("file.heif", "UNSTRUCTURED", True),
("file.epub", "AZURE_DI", True),
("file.doc", "AZURE_DI", True),
("file.rtf", "AZURE_DI", True),
("file.svg", "AZURE_DI", True),
], ],
) )
def test_should_skip_for_service(filename, etl_service, expected_skip): def test_should_skip_for_service(filename, etl_service, expected_skip):

View file

@ -1,6 +1,6 @@
NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000 NEXT_PUBLIC_FASTAPI_BACKEND_URL=http://localhost:8000
NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE=LOCAL or GOOGLE
NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING or AZURE_DI NEXT_PUBLIC_ETL_SERVICE=UNSTRUCTURED or LLAMACLOUD or DOCLING
NEXT_PUBLIC_ZERO_CACHE_URL=http://localhost:4848 NEXT_PUBLIC_ZERO_CACHE_URL=http://localhost:4848
# Contact Form Vars (optional) # Contact Form Vars (optional)

View file

@ -96,11 +96,6 @@ const FILE_TYPE_CONFIG: Record<string, Record<string, string[]>> = {
"image/tiff": [".tiff", ".tif"], "image/tiff": [".tiff", ".tif"],
...audioFileTypes, ...audioFileTypes,
}, },
AZURE_DI: {
...commonTypes,
"image/heic": [".heic"],
...audioFileTypes,
},
default: { default: {
...commonTypes, ...commonTypes,
"application/msword": [".doc"], "application/msword": [".doc"],

View file

@ -19,7 +19,7 @@ export const AUTH_TYPE = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_AUTH_TYPE || "G
// Placeholder: __NEXT_PUBLIC_FASTAPI_BACKEND_URL__ // Placeholder: __NEXT_PUBLIC_FASTAPI_BACKEND_URL__
export const BACKEND_URL = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000"; export const BACKEND_URL = process.env.NEXT_PUBLIC_FASTAPI_BACKEND_URL || "http://localhost:8000";
// ETL Service: "DOCLING", "UNSTRUCTURED", "LLAMACLOUD", or "AZURE_DI" // ETL Service: "DOCLING", "UNSTRUCTURED", or "LLAMACLOUD"
// Placeholder: __NEXT_PUBLIC_ETL_SERVICE__ // Placeholder: __NEXT_PUBLIC_ETL_SERVICE__
export const ETL_SERVICE = process.env.NEXT_PUBLIC_ETL_SERVICE || "DOCLING"; export const ETL_SERVICE = process.env.NEXT_PUBLIC_ETL_SERVICE || "DOCLING";