feat(etl-cache): route all file-based sources through the parse cache

Every file ingestion path (Dropbox, Google Drive / Composio Drive, OneDrive,
local folder, Obsidian, and the legacy upload handlers) now parses via the
extract_with_cache facade instead of calling EtlPipelineService.extract
directly, so identical bytes are deduplicated globally regardless of source.
vision_llm is passed through, keeping the existing cacheability gate intact.
This commit is contained in:
CREDO23 2026-06-12 14:47:25 +02:00
parent 99cf212c31
commit 0fb1d3d37b
6 changed files with 33 additions and 25 deletions

View file

@ -90,11 +90,12 @@ async def download_and_extract_content(
if error:
return None, metadata, error
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=temp_file_path, filename=file_name)
result = await extract_with_cache(
EtlRequest(file_path=temp_file_path, filename=file_name),
vision_llm=vision_llm,
)
markdown = result.markdown_content
return markdown, metadata, None

View file

@ -122,12 +122,13 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(
file_path: str, filename: str, *, vision_llm=None
) -> str:
"""Parse a local file to markdown using the unified ETL pipeline."""
"""Parse a local file to markdown via the cache-aware ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -84,11 +84,12 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(
file_path: str, filename: str, *, vision_llm=None
) -> str:
"""Parse a local file to markdown using the unified ETL pipeline."""
"""Parse a local file to markdown via the cache-aware ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -199,11 +199,12 @@ async def _extract_binary_attachment_markdown(
async def _run_etl_extract(*, file_path: str, filename: str, vision_llm):
"""Lazy-load ETL dependencies to avoid module-import cycles."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
return await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
return await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)

View file

@ -162,12 +162,13 @@ async def _read_file_content(
All file types (plaintext, audio, direct-convert, document, image) are
handled by ``EtlPipelineService``.
"""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
mode = ProcessingMode.coerce(processing_mode)
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename, processing_mode=mode)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename, processing_mode=mode),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -1,8 +1,9 @@
"""
File document processors orchestrating content extraction and indexing.
Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and
keeps only orchestration concerns (notifications, logging, page limits, saving).
Delegates content extraction to the cache-aware ``extract_with_cache`` facade
(over ``EtlPipelineService``) and keeps only orchestration concerns
(notifications, logging, page limits, saving).
"""
from __future__ import annotations
@ -116,8 +117,8 @@ async def _log_page_divergence(
async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Extract content from a non-document file (plaintext/direct_convert/audio/image) via the unified ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
await _notify(ctx, "parsing", "Processing file")
await ctx.task_logger.log_task_progress(
@ -136,8 +137,9 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id)
etl_result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename)
etl_result = await extract_with_cache(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename),
vision_llm=vision_llm,
)
with contextlib.suppress(Exception):
@ -183,8 +185,8 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Route a document file to the configured ETL service via the unified pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.services.etl_credit_service import (
EtlCreditService,
InsufficientCreditsError,
@ -237,13 +239,14 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id)
etl_result = await EtlPipelineService(vision_llm=vision_llm).extract(
etl_result = await extract_with_cache(
EtlRequest(
file_path=ctx.file_path,
filename=ctx.filename,
estimated_pages=estimated_pages,
processing_mode=mode,
)
),
vision_llm=vision_llm,
)
with contextlib.suppress(Exception):