refactor: streamline file processing by integrating ETL pipeline for all file types and removing redundant functions

This commit is contained in:
Anish Sarkar 2026-04-05 17:45:18 +05:30
parent 8224360afa
commit 87af012a60
2 changed files with 123 additions and 840 deletions

View file

@ -44,132 +44,6 @@ from .base import (
logger,
)
PLAINTEXT_EXTENSIONS = frozenset(
{
".md",
".markdown",
".txt",
".text",
".json",
".jsonl",
".yaml",
".yml",
".toml",
".ini",
".cfg",
".conf",
".xml",
".css",
".scss",
".less",
".sass",
".py",
".pyw",
".pyi",
".pyx",
".js",
".jsx",
".ts",
".tsx",
".mjs",
".cjs",
".java",
".kt",
".kts",
".scala",
".groovy",
".c",
".h",
".cpp",
".cxx",
".cc",
".hpp",
".hxx",
".cs",
".fs",
".fsx",
".go",
".rs",
".rb",
".php",
".pl",
".pm",
".lua",
".swift",
".m",
".mm",
".r",
".R",
".jl",
".sh",
".bash",
".zsh",
".fish",
".bat",
".cmd",
".ps1",
".sql",
".graphql",
".gql",
".env",
".gitignore",
".dockerignore",
".editorconfig",
".makefile",
".cmake",
".log",
".rst",
".tex",
".bib",
".org",
".adoc",
".asciidoc",
".vue",
".svelte",
".astro",
".tf",
".hcl",
".proto",
}
)
AUDIO_EXTENSIONS = frozenset(
{
".mp3",
".mp4",
".mpeg",
".mpga",
".m4a",
".wav",
".webm",
}
)
DIRECT_CONVERT_EXTENSIONS = frozenset({".csv", ".tsv", ".html", ".htm"})
def _is_plaintext_file(filename: str) -> bool:
return Path(filename).suffix.lower() in PLAINTEXT_EXTENSIONS
def _is_audio_file(filename: str) -> bool:
return Path(filename).suffix.lower() in AUDIO_EXTENSIONS
def _is_direct_convert_file(filename: str) -> bool:
return Path(filename).suffix.lower() in DIRECT_CONVERT_EXTENSIONS
def _needs_etl(filename: str) -> bool:
"""File is not plaintext, not audio, and not direct-convert — requires ETL."""
return (
not _is_plaintext_file(filename)
and not _is_audio_file(filename)
and not _is_direct_convert_file(filename)
)
HeartbeatCallbackType = Callable[[int], Awaitable[None]]
@ -278,57 +152,21 @@ def scan_folder(
return files
def _read_plaintext_file(file_path: str) -> str:
"""Read a plaintext/text-based file as UTF-8."""
with open(file_path, encoding="utf-8", errors="replace") as f:
content = f.read()
if "\x00" in content:
raise ValueError(
f"File contains null bytes — likely a binary file opened as text: {file_path}"
)
return content
async def _read_file_content(file_path: str, filename: str) -> str:
"""Read file content, using ETL for binary formats.
"""Read file content via the unified ETL pipeline.
Plaintext files are read directly. Audio and document files (PDF, DOCX, etc.)
are routed through the configured ETL service (same as Google Drive / OneDrive).
Raises ValueError if the file cannot be parsed (e.g. no ETL service configured
for a binary file).
All file types (plaintext, audio, direct-convert, document) are handled
by ``EtlPipelineService``.
"""
if _is_plaintext_file(filename):
return _read_plaintext_file(file_path)
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
if _is_direct_convert_file(filename):
from app.tasks.document_processors._direct_converters import (
convert_file_directly,
)
return convert_file_directly(file_path, filename)
if _is_audio_file(filename):
etl_service = config.ETL_SERVICE if hasattr(config, "ETL_SERVICE") else None
stt_service_val = config.STT_SERVICE if hasattr(config, "STT_SERVICE") else None
if not stt_service_val and not etl_service:
raise ValueError(
f"No STT_SERVICE configured — cannot transcribe audio file: {filename}"
)
if _needs_etl(filename):
etl_service = getattr(config, "ETL_SERVICE", None)
if not etl_service:
raise ValueError(
f"No ETL_SERVICE configured — cannot parse binary file: {filename}. "
f"Set ETL_SERVICE to UNSTRUCTURED, LLAMACLOUD, or DOCLING in your .env"
)
from app.connectors.onedrive.content_extractor import (
_parse_file_to_markdown,
result = await EtlPipelineService().extract(
EtlRequest(file_path=file_path, filename=filename)
)
return await _parse_file_to_markdown(file_path, filename)
return result.markdown_content
def _content_hash(content: str, search_space_id: int) -> str:

View file

@ -1,14 +1,8 @@
"""
File document processors orchestrating content extraction and indexing.
This module is the public entry point for file processing. It delegates to
specialised sub-modules that each own a single concern:
- ``_constants`` file type classification and configuration constants
- ``_helpers`` document deduplication, migration, connector helpers
- ``_direct_converters`` lossless file-to-markdown for csv/tsv/html
- ``_etl`` ETL parsing strategies (Unstructured, LlamaCloud, Docling)
- ``_save`` unified document creation / update logic
Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and
keeps only orchestration concerns (notifications, logging, page limits, saving).
"""
from __future__ import annotations
@ -17,38 +11,19 @@ import contextlib
import logging
import os
from dataclasses import dataclass, field
from logging import ERROR, getLogger
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config as app_config
from app.db import Document, Log, Notification
from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService
from ._constants import FileCategory, classify_file
from ._direct_converters import convert_file_directly
from ._etl import (
parse_with_docling,
parse_with_llamacloud_retry,
parse_with_unstructured,
)
from ._helpers import update_document_from_connector
from ._save import (
add_received_file_document_using_docling,
add_received_file_document_using_llamacloud,
add_received_file_document_using_unstructured,
save_file_document,
)
from ._save import save_file_document
from .markdown_processor import add_received_markdown_file_document
# Re-export public API so existing ``from file_processors import …`` keeps working.
__all__ = [
"add_received_file_document_using_docling",
"add_received_file_document_using_llamacloud",
"add_received_file_document_using_unstructured",
"parse_with_llamacloud_retry",
"process_file_in_background",
"process_file_in_background_with_document",
"save_file_document",
@ -142,35 +117,31 @@ async def _log_page_divergence(
# ===================================================================
async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None:
"""Read a markdown / text file and create or update a document."""
await _notify(ctx, "parsing", "Reading file")
async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Extract content from a non-document file (plaintext/direct_convert/audio) via the unified ETL pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
await _notify(ctx, "parsing", "Processing file")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing markdown/text file: {ctx.filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
f"Processing file: {ctx.filename}",
{"processing_stage": "extracting"},
)
with open(ctx.file_path, encoding="utf-8") as f:
markdown_content = f.read()
etl_result = await EtlPipelineService().extract(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename)
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Creating document from markdown content: {ctx.filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
markdown_content,
etl_result.markdown_content,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
@ -181,179 +152,19 @@ async def _process_markdown_upload(ctx: _ProcessingContext) -> Document | None:
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed markdown file: {ctx.filename}",
f"Successfully processed file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "markdown",
"file_type": etl_result.content_type,
"etl_service": etl_result.etl_service,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Markdown file already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "markdown"},
)
return result
async def _process_direct_convert_upload(ctx: _ProcessingContext) -> Document | None:
"""Convert a text-based file (csv/tsv/html) to markdown without ETL."""
await _notify(ctx, "parsing", "Converting file")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Direct-converting file to markdown: {ctx.filename}",
{"file_type": "direct_convert", "processing_stage": "converting"},
)
markdown_content = convert_file_directly(ctx.file_path, ctx.filename)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Creating document from converted content: {ctx.filename}",
{
"processing_stage": "creating_document",
"content_length": len(markdown_content),
},
)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
markdown_content,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully direct-converted file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "direct_convert",
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Direct-converted file already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "direct_convert"},
)
return result
async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None:
"""Transcribe an audio file and create or update a document."""
await _notify(ctx, "parsing", "Transcribing audio")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing audio file for transcription: {ctx.filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
try:
stt_result = stt_service.transcribe_file(ctx.file_path)
transcribed_text = stt_result.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
transcribed_text = (
f"# Transcription of {ctx.filename}\n\n{transcribed_text}"
)
except Exception as e:
raise HTTPException(
status_code=422,
detail=f"Failed to transcribe audio file {ctx.filename}: {e!s}",
) from e
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Local STT transcription completed: {ctx.filename}",
{
"processing_stage": "local_transcription_complete",
"language": stt_result.get("language"),
"confidence": stt_result.get("language_probability"),
"duration": stt_result.get("duration"),
},
)
else:
from litellm import atranscription
with open(ctx.file_path, "rb") as audio_file:
transcription_kwargs: dict = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
transcription_kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
transcription_response = await atranscription(**transcription_kwargs)
transcribed_text = transcription_response.get("text", "")
if not transcribed_text:
raise ValueError("Transcription returned empty text")
transcribed_text = f"# Transcription of {ctx.filename}\n\n{transcribed_text}"
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Transcription completed, creating document: {ctx.filename}",
{
"processing_stage": "transcription_complete",
"transcript_length": len(transcribed_text),
},
)
await _notify(ctx, "chunking")
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
result = await add_received_markdown_file_document(
ctx.session,
ctx.filename,
transcribed_text,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully transcribed and processed audio file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "audio",
"transcript_length": len(transcribed_text),
"stt_service": stt_service_type,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Audio file transcript already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": "audio"},
f"File already exists (duplicate): {ctx.filename}",
{"duplicate_detected": True, "file_type": etl_result.content_type},
)
return result
@ -363,279 +174,10 @@ async def _process_audio_upload(ctx: _ProcessingContext) -> Document | None:
# ---------------------------------------------------------------------------
async def _etl_unstructured(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the Unstructured ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with Unstructured ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"processing_stage": "loading",
},
)
docs = await parse_with_unstructured(ctx.file_path)
await _notify(ctx, "chunking", chunks_count=len(docs))
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Unstructured ETL completed, creating document: {ctx.filename}",
{"processing_stage": "etl_complete", "elements_count": len(docs)},
)
actual_pages = page_limit_service.estimate_pages_from_elements(docs)
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
result = await add_received_file_document_using_unstructured(
ctx.session,
ctx.filename,
docs,
ctx.search_space_id,
ctx.user_id,
ctx.connector,
enable_summary=ctx.enable_summary,
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with Unstructured: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
"pages_processed": final_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "UNSTRUCTURED",
},
)
return result
async def _etl_llamacloud(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the LlamaCloud ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with LlamaCloud ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "LLAMACLOUD",
"processing_stage": "parsing",
"estimated_pages": estimated_pages,
},
)
raw_result = await parse_with_llamacloud_retry(
file_path=ctx.file_path,
estimated_pages=estimated_pages,
task_logger=ctx.task_logger,
log_entry=ctx.log_entry,
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
markdown_documents = await raw_result.aget_markdown_documents(split_by_page=False)
await _notify(ctx, "chunking", chunks_count=len(markdown_documents))
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"LlamaCloud parsing completed, creating documents: {ctx.filename}",
{
"processing_stage": "parsing_complete",
"documents_count": len(markdown_documents),
},
)
if not markdown_documents:
await ctx.task_logger.log_task_failure(
ctx.log_entry,
f"LlamaCloud parsing returned no documents: {ctx.filename}",
"ETL service returned empty document list",
{"error_type": "EmptyDocumentList", "etl_service": "LLAMACLOUD"},
)
raise ValueError(f"LlamaCloud parsing returned no documents for {ctx.filename}")
actual_pages = page_limit_service.estimate_pages_from_markdown(markdown_documents)
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
any_created = False
last_doc: Document | None = None
for doc in markdown_documents:
doc_result = await add_received_file_document_using_llamacloud(
ctx.session,
ctx.filename,
llamacloud_markdown_document=doc.text,
search_space_id=ctx.search_space_id,
user_id=ctx.user_id,
connector=ctx.connector,
enable_summary=ctx.enable_summary,
)
if doc_result:
any_created = True
last_doc = doc_result
if any_created:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(last_doc, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with LlamaCloud: {ctx.filename}",
{
"document_id": last_doc.id,
"content_hash": last_doc.content_hash,
"file_type": "document",
"etl_service": "LLAMACLOUD",
"pages_processed": final_pages,
"documents_count": len(markdown_documents),
},
)
return last_doc
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "LLAMACLOUD",
"documents_count": len(markdown_documents),
},
)
return None
async def _etl_docling(
ctx: _ProcessingContext,
page_limit_service,
estimated_pages: int,
) -> Document | None:
"""Parse and save via the Docling ETL service."""
await _notify(ctx, "parsing", "Extracting content")
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Processing file with Docling ETL: {ctx.filename}",
{
"file_type": "document",
"etl_service": "DOCLING",
"processing_stage": "parsing",
},
)
content = await parse_with_docling(ctx.file_path, ctx.filename)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await ctx.task_logger.log_task_progress(
ctx.log_entry,
f"Docling parsing completed, creating document: {ctx.filename}",
{"processing_stage": "parsing_complete", "content_length": len(content)},
)
actual_pages = page_limit_service.estimate_pages_from_content_length(len(content))
final_pages = max(estimated_pages, actual_pages)
await _log_page_divergence(
ctx.task_logger,
ctx.log_entry,
ctx.filename,
estimated_pages,
actual_pages,
final_pages,
)
await _notify(ctx, "chunking")
result = await add_received_file_document_using_docling(
ctx.session,
ctx.filename,
docling_markdown_document=content,
search_space_id=ctx.search_space_id,
user_id=ctx.user_id,
connector=ctx.connector,
enable_summary=ctx.enable_summary,
)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, final_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file with Docling: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": "DOCLING",
"pages_processed": final_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": "DOCLING",
},
)
return result
async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Route a document file to the configured ETL service."""
"""Route a document file to the configured ETL service via the unified pipeline."""
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
page_limit_service = PageLimitService(ctx.session)
@ -665,16 +207,60 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
os.unlink(ctx.file_path)
raise HTTPException(status_code=403, detail=str(e)) from e
etl_dispatch = {
"UNSTRUCTURED": _etl_unstructured,
"LLAMACLOUD": _etl_llamacloud,
"DOCLING": _etl_docling,
}
handler = etl_dispatch.get(app_config.ETL_SERVICE)
if handler is None:
raise RuntimeError(f"Unknown ETL_SERVICE: {app_config.ETL_SERVICE}")
await _notify(ctx, "parsing", "Extracting content")
return await handler(ctx, page_limit_service, estimated_pages)
etl_result = await EtlPipelineService().extract(
EtlRequest(
file_path=ctx.file_path,
filename=ctx.filename,
estimated_pages=estimated_pages,
)
)
with contextlib.suppress(Exception):
os.unlink(ctx.file_path)
await _notify(ctx, "chunking")
result = await save_file_document(
ctx.session,
ctx.filename,
etl_result.markdown_content,
ctx.search_space_id,
ctx.user_id,
etl_result.etl_service,
ctx.connector,
enable_summary=ctx.enable_summary,
)
if result:
await page_limit_service.update_page_usage(
ctx.user_id, estimated_pages, allow_exceed=True
)
if ctx.connector:
await update_document_from_connector(result, ctx.connector, ctx.session)
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Successfully processed file: {ctx.filename}",
{
"document_id": result.id,
"content_hash": result.content_hash,
"file_type": "document",
"etl_service": etl_result.etl_service,
"pages_processed": estimated_pages,
},
)
else:
await ctx.task_logger.log_task_success(
ctx.log_entry,
f"Document already exists (duplicate): {ctx.filename}",
{
"duplicate_detected": True,
"file_type": "document",
"etl_service": etl_result.etl_service,
},
)
return result
# ===================================================================
@ -706,15 +292,14 @@ async def process_file_in_background(
)
try:
category = classify_file(filename)
from app.etl_pipeline.file_classifier import FileCategory as EtlFileCategory
from app.etl_pipeline.file_classifier import classify_file as etl_classify
if category == FileCategory.MARKDOWN:
return await _process_markdown_upload(ctx)
if category == FileCategory.DIRECT_CONVERT:
return await _process_direct_convert_upload(ctx)
if category == FileCategory.AUDIO:
return await _process_audio_upload(ctx)
return await _process_document_upload(ctx)
category = etl_classify(filename)
if category == EtlFileCategory.DOCUMENT:
return await _process_document_upload(ctx)
return await _process_non_document_upload(ctx)
except Exception as e:
await session.rollback()
@ -758,201 +343,61 @@ async def _extract_file_content(
Returns:
Tuple of (markdown_content, etl_service_name).
"""
category = classify_file(filename)
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.etl_pipeline.file_classifier import FileCategory
from app.etl_pipeline.file_classifier import classify_file as etl_classify
if category == FileCategory.MARKDOWN:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Reading file",
)
await task_logger.log_task_progress(
log_entry,
f"Processing markdown/text file: {filename}",
{"file_type": "markdown", "processing_stage": "reading_file"},
)
with open(file_path, encoding="utf-8") as f:
content = f.read()
with contextlib.suppress(Exception):
os.unlink(file_path)
return content, "MARKDOWN"
if category == FileCategory.DIRECT_CONVERT:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Converting file",
)
await task_logger.log_task_progress(
log_entry,
f"Direct-converting file to markdown: {filename}",
{"file_type": "direct_convert", "processing_stage": "converting"},
)
content = convert_file_directly(file_path, filename)
with contextlib.suppress(Exception):
os.unlink(file_path)
return content, "DIRECT_CONVERT"
if category == FileCategory.AUDIO:
if notification:
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Transcribing audio",
)
await task_logger.log_task_progress(
log_entry,
f"Processing audio file for transcription: {filename}",
{"file_type": "audio", "processing_stage": "starting_transcription"},
)
transcribed_text = await _transcribe_audio(file_path, filename)
with contextlib.suppress(Exception):
os.unlink(file_path)
return transcribed_text, "AUDIO_TRANSCRIPTION"
# Document file — use ETL service
return await _extract_document_content(
file_path,
filename,
session,
user_id,
task_logger,
log_entry,
notification,
)
async def _transcribe_audio(file_path: str, filename: str) -> str:
"""Transcribe an audio file and return formatted markdown text."""
stt_service_type = (
"local"
if app_config.STT_SERVICE and app_config.STT_SERVICE.startswith("local/")
else "external"
)
if stt_service_type == "local":
from app.services.stt_service import stt_service
result = stt_service.transcribe_file(file_path)
text = result.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
else:
from litellm import atranscription
with open(file_path, "rb") as audio_file:
kwargs: dict = {
"model": app_config.STT_SERVICE,
"file": audio_file,
"api_key": app_config.STT_SERVICE_API_KEY,
}
if app_config.STT_SERVICE_API_BASE:
kwargs["api_base"] = app_config.STT_SERVICE_API_BASE
response = await atranscription(**kwargs)
text = response.get("text", "")
if not text:
raise ValueError("Transcription returned empty text")
return f"# Transcription of {filename}\n\n{text}"
async def _extract_document_content(
file_path: str,
filename: str,
session: AsyncSession,
user_id: str,
task_logger: TaskLoggingService,
log_entry: Log,
notification: Notification | None,
) -> tuple[str, str]:
"""
Parse a document file via the configured ETL service.
Returns:
Tuple of (markdown_content, etl_service_name).
"""
from app.services.page_limit_service import PageLimitService
page_limit_service = PageLimitService(session)
try:
estimated_pages = page_limit_service.estimate_pages_before_processing(file_path)
except Exception:
file_size = os.path.getsize(file_path)
estimated_pages = max(1, file_size // (80 * 1024))
await page_limit_service.check_page_limit(user_id, estimated_pages)
etl_service = app_config.ETL_SERVICE
markdown_content: str | None = None
category = etl_classify(filename)
estimated_pages = 0
if notification:
stage_messages = {
FileCategory.PLAINTEXT: "Reading file",
FileCategory.DIRECT_CONVERT: "Converting file",
FileCategory.AUDIO: "Transcribing audio",
FileCategory.DOCUMENT: "Extracting content",
}
await NotificationService.document_processing.notify_processing_progress(
session,
notification,
stage="parsing",
stage_message="Extracting content",
stage_message=stage_messages.get(category, "Processing"),
)
if etl_service == "UNSTRUCTURED":
from app.utils.document_converters import convert_document_to_markdown
await task_logger.log_task_progress(
log_entry,
f"Processing {category.value} file: {filename}",
{"file_type": category.value, "processing_stage": "extracting"},
)
docs = await parse_with_unstructured(file_path)
markdown_content = await convert_document_to_markdown(docs)
actual_pages = page_limit_service.estimate_pages_from_elements(docs)
final_pages = max(estimated_pages, actual_pages)
await page_limit_service.update_page_usage(
user_id, final_pages, allow_exceed=True
)
if category == FileCategory.DOCUMENT:
from app.services.page_limit_service import PageLimitService
elif etl_service == "LLAMACLOUD":
raw_result = await parse_with_llamacloud_retry(
page_limit_service = PageLimitService(session)
estimated_pages = _estimate_pages_safe(page_limit_service, file_path)
await page_limit_service.check_page_limit(user_id, estimated_pages)
result = await EtlPipelineService().extract(
EtlRequest(
file_path=file_path,
filename=filename,
estimated_pages=estimated_pages,
task_logger=task_logger,
log_entry=log_entry,
)
markdown_documents = await raw_result.aget_markdown_documents(
split_by_page=False
)
if not markdown_documents:
raise RuntimeError(f"LlamaCloud parsing returned no documents: {filename}")
markdown_content = markdown_documents[0].text
)
if category == FileCategory.DOCUMENT:
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
elif etl_service == "DOCLING":
getLogger("docling.pipeline.base_pipeline").setLevel(ERROR)
getLogger("docling.document_converter").setLevel(ERROR)
getLogger("docling_core.transforms.chunker.hierarchical_chunker").setLevel(
ERROR
)
from docling.document_converter import DocumentConverter
converter = DocumentConverter()
result = converter.convert(file_path)
markdown_content = result.document.export_to_markdown()
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
)
else:
raise RuntimeError(f"Unknown ETL_SERVICE: {etl_service}")
with contextlib.suppress(Exception):
os.unlink(file_path)
if not markdown_content:
if not result.markdown_content:
raise RuntimeError(f"Failed to extract content from file: {filename}")
return markdown_content, etl_service
return result.markdown_content, result.etl_service
async def process_file_in_background_with_document(