From 0fb1d3d37b5e697bf0cf3b3286f11a8f57dfebb0 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 12 Jun 2026 14:47:25 +0200 Subject: [PATCH] feat(etl-cache): route all file-based sources through the parse cache Every file ingestion path (Dropbox, Google Drive / Composio Drive, OneDrive, local folder, Obsidian, and the legacy upload handlers) now parses via the extract_with_cache facade instead of calling EtlPipelineService.extract directly, so identical bytes are deduplicated globally regardless of source. vision_llm is passed through, keeping the existing cacheability gate intact. --- .../connectors/dropbox/content_extractor.py | 7 ++++--- .../google_drive/content_extractor.py | 9 +++++---- .../connectors/onedrive/content_extractor.py | 9 +++++---- .../app/services/obsidian_plugin_indexer.py | 7 ++++--- .../local_folder_indexer.py | 7 ++++--- .../document_processors/file_processors.py | 19 +++++++++++-------- 6 files changed, 33 insertions(+), 25 deletions(-) diff --git a/surfsense_backend/app/connectors/dropbox/content_extractor.py b/surfsense_backend/app/connectors/dropbox/content_extractor.py index 372d2fc82..300010c26 100644 --- a/surfsense_backend/app/connectors/dropbox/content_extractor.py +++ b/surfsense_backend/app/connectors/dropbox/content_extractor.py @@ -90,11 +90,12 @@ async def download_and_extract_content( if error: return None, metadata, error + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - result = await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=temp_file_path, filename=file_name) + result = await extract_with_cache( + EtlRequest(file_path=temp_file_path, filename=file_name), + vision_llm=vision_llm, ) markdown = result.markdown_content return markdown, metadata, None diff --git a/surfsense_backend/app/connectors/google_drive/content_extractor.py b/surfsense_backend/app/connectors/google_drive/content_extractor.py index 59392831d..1ea047978 100644 --- a/surfsense_backend/app/connectors/google_drive/content_extractor.py +++ b/surfsense_backend/app/connectors/google_drive/content_extractor.py @@ -122,12 +122,13 @@ async def download_and_extract_content( async def _parse_file_to_markdown( file_path: str, filename: str, *, vision_llm=None ) -> str: - """Parse a local file to markdown using the unified ETL pipeline.""" + """Parse a local file to markdown via the cache-aware ETL pipeline.""" + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - result = await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=file_path, filename=filename) + result = await extract_with_cache( + EtlRequest(file_path=file_path, filename=filename), + vision_llm=vision_llm, ) return result.markdown_content diff --git a/surfsense_backend/app/connectors/onedrive/content_extractor.py b/surfsense_backend/app/connectors/onedrive/content_extractor.py index 3154f2eca..fb1d31fbc 100644 --- a/surfsense_backend/app/connectors/onedrive/content_extractor.py +++ b/surfsense_backend/app/connectors/onedrive/content_extractor.py @@ -84,11 +84,12 @@ async def download_and_extract_content( async def _parse_file_to_markdown( file_path: str, filename: str, *, vision_llm=None ) -> str: - """Parse a local file to markdown using the unified ETL pipeline.""" + """Parse a local file to markdown via the cache-aware ETL pipeline.""" + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - result = await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=file_path, filename=filename) + result = await extract_with_cache( + EtlRequest(file_path=file_path, filename=filename), + vision_llm=vision_llm, ) return result.markdown_content diff --git a/surfsense_backend/app/services/obsidian_plugin_indexer.py b/surfsense_backend/app/services/obsidian_plugin_indexer.py index 13f43d1ee..cd05d7935 100644 --- a/surfsense_backend/app/services/obsidian_plugin_indexer.py +++ b/surfsense_backend/app/services/obsidian_plugin_indexer.py @@ -199,11 +199,12 @@ async def _extract_binary_attachment_markdown( async def _run_etl_extract(*, file_path: str, filename: str, vision_llm): """Lazy-load ETL dependencies to avoid module-import cycles.""" + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService - return await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=file_path, filename=filename) + return await extract_with_cache( + EtlRequest(file_path=file_path, filename=filename), + vision_llm=vision_llm, ) diff --git a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py index 1a2d4b967..2505fa7c4 100644 --- a/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/local_folder_indexer.py @@ -162,12 +162,13 @@ async def _read_file_content( All file types (plaintext, audio, direct-convert, document, image) are handled by ``EtlPipelineService``. """ + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService mode = ProcessingMode.coerce(processing_mode) - result = await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=file_path, filename=filename, processing_mode=mode) + result = await extract_with_cache( + EtlRequest(file_path=file_path, filename=filename, processing_mode=mode), + vision_llm=vision_llm, ) return result.markdown_content diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 0c3d30766..174ac966d 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -1,8 +1,9 @@ """ File document processors orchestrating content extraction and indexing. -Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and -keeps only orchestration concerns (notifications, logging, page limits, saving). +Delegates content extraction to the cache-aware ``extract_with_cache`` facade +(over ``EtlPipelineService``) and keeps only orchestration concerns +(notifications, logging, page limits, saving). """ from __future__ import annotations @@ -116,8 +117,8 @@ async def _log_page_divergence( async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None: """Extract content from a non-document file (plaintext/direct_convert/audio/image) via the unified ETL pipeline.""" + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService await _notify(ctx, "parsing", "Processing file") await ctx.task_logger.log_task_progress( @@ -136,8 +137,9 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id) - etl_result = await EtlPipelineService(vision_llm=vision_llm).extract( - EtlRequest(file_path=ctx.file_path, filename=ctx.filename) + etl_result = await extract_with_cache( + EtlRequest(file_path=ctx.file_path, filename=ctx.filename), + vision_llm=vision_llm, ) with contextlib.suppress(Exception): @@ -183,8 +185,8 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No async def _process_document_upload(ctx: _ProcessingContext) -> Document | None: """Route a document file to the configured ETL service via the unified pipeline.""" + from app.etl_pipeline.cache import extract_with_cache from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode - from app.etl_pipeline.etl_pipeline_service import EtlPipelineService from app.services.etl_credit_service import ( EtlCreditService, InsufficientCreditsError, @@ -237,13 +239,14 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None: vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id) - etl_result = await EtlPipelineService(vision_llm=vision_llm).extract( + etl_result = await extract_with_cache( EtlRequest( file_path=ctx.file_path, filename=ctx.filename, estimated_pages=estimated_pages, processing_mode=mode, - ) + ), + vision_llm=vision_llm, ) with contextlib.suppress(Exception):