From 81fa219b3010e58c39c39ccfc3628bcec251b6e2 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Thu, 4 Jun 2026 00:50:19 +0530 Subject: [PATCH] feat(backend): Remove LLM summaries from document indexing --- .../adapters/file_upload_adapter.py | 12 +- .../indexing_pipeline/connector_document.py | 2 - .../indexing_pipeline/document_summarizer.py | 30 ---- .../indexing_pipeline_service.py | 36 +---- .../app/routes/documents_routes.py | 4 - .../app/services/docling_service.py | 143 ------------------ .../app/services/obsidian_plugin_indexer.py | 23 +-- .../app/services/task_dispatcher.py | 3 - .../celery_tasks/document_reindex_tasks.py | 7 +- .../app/tasks/celery_tasks/document_tasks.py | 29 +--- .../app/tasks/document_processors/_save.py | 82 +--------- .../circleback_processor.py | 36 +---- .../extension_processor.py | 25 +-- .../document_processors/file_processors.py | 15 +- .../document_processors/markdown_processor.py | 20 +-- .../document_processors/youtube_processor.py | 39 +---- .../app/utils/document_converters.py | 52 ------- 17 files changed, 40 insertions(+), 518 deletions(-) delete mode 100644 surfsense_backend/app/indexing_pipeline/document_summarizer.py diff --git a/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py b/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py index 0bbb67105..9a9e4e4d6 100644 --- a/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py +++ b/surfsense_backend/app/indexing_pipeline/adapters/file_upload_adapter.py @@ -18,8 +18,6 @@ class UploadDocumentAdapter: etl_service: str, search_space_id: int, user_id: str, - llm, - should_summarize: bool = False, ) -> None: connector_doc = ConnectorDocument( title=filename, @@ -29,9 +27,7 @@ class UploadDocumentAdapter: search_space_id=search_space_id, created_by_id=user_id, connector_id=None, - should_summarize=should_summarize, should_use_code_chunker=False, - fallback_summary=markdown_content[:4000], metadata={ "FILE_NAME": filename, "ETL_SERVICE": etl_service, @@ -43,7 +39,7 @@ class UploadDocumentAdapter: if not documents: raise RuntimeError("prepare_for_indexing returned no documents") - indexed = await self._service.index(documents[0], connector_doc, llm) + indexed = await self._service.index(documents[0], connector_doc) if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): raise RuntimeError(indexed.status.get("reason", "Indexing failed")) @@ -51,7 +47,7 @@ class UploadDocumentAdapter: indexed.content_needs_reindexing = False await self._session.commit() - async def reindex(self, document: Document, llm) -> None: + async def reindex(self, document: Document) -> None: """Re-index an existing document after its source_markdown has been updated.""" if not document.source_markdown: raise RuntimeError("Document has no source_markdown to reindex") @@ -66,15 +62,13 @@ class UploadDocumentAdapter: search_space_id=document.search_space_id, created_by_id=str(document.created_by_id), connector_id=document.connector_id, - should_summarize=True, should_use_code_chunker=False, - fallback_summary=document.source_markdown[:4000], metadata=metadata, ) document.content_hash = compute_content_hash(connector_doc) - indexed = await self._service.index(document, connector_doc, llm) + indexed = await self._service.index(document, connector_doc) if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): raise RuntimeError(indexed.status.get("reason", "Reindexing failed")) diff --git a/surfsense_backend/app/indexing_pipeline/connector_document.py b/surfsense_backend/app/indexing_pipeline/connector_document.py index 4f5d6e2e0..1297a6b46 100644 --- a/surfsense_backend/app/indexing_pipeline/connector_document.py +++ b/surfsense_backend/app/indexing_pipeline/connector_document.py @@ -11,9 +11,7 @@ class ConnectorDocument(BaseModel): unique_id: str document_type: DocumentType search_space_id: int = Field(gt=0) - should_summarize: bool = True should_use_code_chunker: bool = False - fallback_summary: str | None = None metadata: dict = {} connector_id: int | None = None created_by_id: str diff --git a/surfsense_backend/app/indexing_pipeline/document_summarizer.py b/surfsense_backend/app/indexing_pipeline/document_summarizer.py deleted file mode 100644 index 76cc77377..000000000 --- a/surfsense_backend/app/indexing_pipeline/document_summarizer.py +++ /dev/null @@ -1,30 +0,0 @@ -from app.prompts import SUMMARY_PROMPT_TEMPLATE -from app.utils.document_converters import optimize_content_for_context_window - - -async def summarize_document( - source_markdown: str, llm, metadata: dict | None = None -) -> str: - """Generate a text summary of a document using an LLM, prefixed with metadata when provided.""" - model_name = getattr(llm, "model", "gpt-3.5-turbo") - optimized_content = optimize_content_for_context_window( - source_markdown, metadata, model_name - ) - - summary_chain = SUMMARY_PROMPT_TEMPLATE | llm - content_with_metadata = ( - f"\n\n{metadata}\n\n" - f"\n\n\n\n{optimized_content}\n\n" - ) - summary_result = await summary_chain.ainvoke({"document": content_with_metadata}) - summary_content = summary_result.content - - if metadata: - metadata_parts = ["# DOCUMENT METADATA"] - for key, value in metadata.items(): - if value: - metadata_parts.append(f"**{key.replace('_', ' ').title()}:** {value}") - metadata_section = "\n".join(metadata_parts) - return f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" - - return summary_content diff --git a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py index 282bd6034..3d0124059 100644 --- a/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py +++ b/surfsense_backend/app/indexing_pipeline/indexing_pipeline_service.py @@ -31,7 +31,6 @@ from app.indexing_pipeline.document_persistence import ( attach_chunks_to_document, rollback_and_persist_failure, ) -from app.indexing_pipeline.document_summarizer import summarize_document from app.indexing_pipeline.exceptions import ( EMBEDDING_ERRORS, PERMANENT_LLM_ERRORS, @@ -203,9 +202,7 @@ class IndexingPipelineService: await self.session.commit() - async def index_batch( - self, connector_docs: list[ConnectorDocument], llm - ) -> list[Document]: + async def index_batch(self, connector_docs: list[ConnectorDocument]) -> list[Document]: """Convenience method: prepare_for_indexing then index each document. Indexers that need heartbeat callbacks or custom per-document logic @@ -218,7 +215,7 @@ class IndexingPipelineService: connector_doc = doc_map.get(document.unique_identifier_hash) if connector_doc is None: continue - result = await self.index(document, connector_doc, llm) + result = await self.index(document, connector_doc) results.append(result) return results @@ -350,11 +347,9 @@ class IndexingPipelineService: await self.session.rollback() return [] - async def index( - self, document: Document, connector_doc: ConnectorDocument, llm - ) -> Document: + async def index(self, document: Document, connector_doc: ConnectorDocument) -> Document: """ - Run summarization, embedding, and chunking for a document and persist the results. + Run deterministic content storage, embedding, and chunking for a document. """ ctx = PipelineLogContext( connector_id=connector_doc.connector_id, @@ -379,20 +374,7 @@ class IndexingPipelineService: document.status = DocumentStatus.processing() await self.session.commit() - t_step = time.perf_counter() - if connector_doc.should_summarize and llm is not None: - content = await summarize_document( - connector_doc.source_markdown, llm, connector_doc.metadata - ) - perf.info( - "[indexing] summarize_document doc=%d in %.3fs", - document.id, - time.perf_counter() - t_step, - ) - elif connector_doc.should_summarize and connector_doc.fallback_summary: - content = connector_doc.fallback_summary - else: - content = connector_doc.source_markdown + content = connector_doc.source_markdown await self.session.execute( delete(Chunk).where(Chunk.document_id == document.id) @@ -523,7 +505,6 @@ class IndexingPipelineService: async def index_batch_parallel( self, connector_docs: list[ConnectorDocument], - get_llm: Callable[[AsyncSession], Awaitable], *, max_concurrency: int = 4, on_heartbeat: Callable[[int], Awaitable[None]] | None = None, @@ -532,8 +513,8 @@ class IndexingPipelineService: """Index documents in parallel with bounded concurrency. Phase 1 (serial): prepare_for_indexing using self.session. - Phase 2 (parallel): index each document in an isolated session, - bounded by a semaphore to avoid overwhelming APIs/DB. + Phase 2 (parallel): index each document in an isolated session, bounded + by a semaphore to avoid overwhelming embedding APIs/DB. """ logger = logging.getLogger(__name__) perf = get_perf_logger() @@ -577,9 +558,8 @@ class IndexingPipelineService: failed_count += 1 return document - llm = await get_llm(isolated_session) iso_pipeline = IndexingPipelineService(isolated_session) - result = await iso_pipeline.index(refetched, connector_doc, llm) + result = await iso_pipeline.index(refetched, connector_doc) async with lock: if DocumentStatus.is_state( diff --git a/surfsense_backend/app/routes/documents_routes.py b/surfsense_backend/app/routes/documents_routes.py index 4501f2111..cafd34ef7 100644 --- a/surfsense_backend/app/routes/documents_routes.py +++ b/surfsense_backend/app/routes/documents_routes.py @@ -125,7 +125,6 @@ async def create_documents( async def create_documents_file_upload( files: list[UploadFile], search_space_id: int = Form(...), - should_summarize: bool = Form(False), use_vision_llm: bool = Form(False), processing_mode: str = Form("basic"), session: AsyncSession = Depends(get_async_session), @@ -309,7 +308,6 @@ async def create_documents_file_upload( filename=filename, search_space_id=search_space_id, user_id=str(user.id), - should_summarize=should_summarize, use_vision_llm=use_vision_llm, processing_mode=validated_mode.value, ) @@ -1586,7 +1584,6 @@ async def folder_upload( search_space_id: int = Form(...), relative_paths: str = Form(...), root_folder_id: int | None = Form(None), - enable_summary: bool = Form(False), use_vision_llm: bool = Form(False), processing_mode: str = Form("basic"), session: AsyncSession = Depends(get_async_session), @@ -1719,7 +1716,6 @@ async def folder_upload( user_id=str(user.id), folder_name=folder_name, root_folder_id=root_folder_id, - enable_summary=enable_summary, use_vision_llm=use_vision_llm, file_mappings=list(file_mappings), processing_mode=validated_mode.value, diff --git a/surfsense_backend/app/services/docling_service.py b/surfsense_backend/app/services/docling_service.py index cf51efb4a..dc87e75f0 100644 --- a/surfsense_backend/app/services/docling_service.py +++ b/surfsense_backend/app/services/docling_service.py @@ -191,149 +191,6 @@ class DoclingService: logger.error(f"Full traceback: {traceback.format_exc()}") raise RuntimeError(f"Docling processing failed: {e}") from e - async def process_large_document_summary( - self, content: str, llm, document_title: str = "Document" - ) -> str: - """ - Process large documents using chunked LLM summarization. - - Args: - content: The full document content - llm: The language model to use for summarization - document_title: Title of the document for context - - Returns: - Final summary of the document - """ - # Large document threshold (100K characters ≈ 25K tokens) - large_document_threshold = 100_000 - - if len(content) <= large_document_threshold: - # For smaller documents, use direct processing - logger.info( - f"📄 Document size: {len(content)} chars - using direct processing" - ) - from app.prompts import SUMMARY_PROMPT_TEMPLATE - - summary_chain = SUMMARY_PROMPT_TEMPLATE | llm - result = await summary_chain.ainvoke({"document": content}) - return result.content - - logger.info( - f"📚 Large document detected: {len(content)} chars - using chunked processing" - ) - - # Import chunker from config - # Create LLM-optimized chunks (8K tokens max for safety) - from chonkie import OverlapRefinery, RecursiveChunker - from langchain_core.prompts import PromptTemplate - - llm_chunker = RecursiveChunker( - chunk_size=8000 # Conservative for most LLMs - ) - - # Apply overlap refinery for context preservation (10% overlap = 800 tokens) - overlap_refinery = OverlapRefinery( - context_size=0.1, # 10% overlap for context preservation - method="suffix", # Add next chunk context to current chunk - ) - - # First chunk the content, then apply overlap refinery - initial_chunks = llm_chunker.chunk(content) - chunks = overlap_refinery.refine(initial_chunks) - total_chunks = len(chunks) - - logger.info(f"📄 Split into {total_chunks} chunks for LLM processing") - - # Template for chunk processing - chunk_template = PromptTemplate( - input_variables=["chunk", "chunk_number", "total_chunks"], - template=""" -You are summarizing chunk {chunk_number} of {total_chunks} from a large document. - -Create a comprehensive summary of this document chunk. Focus on: -- Key concepts, facts, and information -- Important details and context -- Main topics and themes - -Provide a clear, structured summary that captures the essential content. - -Chunk {chunk_number}/{total_chunks}: - -{chunk} - -""", - ) - - # Process each chunk individually - chunk_summaries = [] - for i, chunk in enumerate(chunks, 1): - try: - logger.info( - f"🔄 Processing chunk {i}/{total_chunks} ({len(chunk.text)} chars)" - ) - - chunk_chain = chunk_template | llm - chunk_result = await chunk_chain.ainvoke( - { - "chunk": chunk.text, - "chunk_number": i, - "total_chunks": total_chunks, - } - ) - - chunk_summary = chunk_result.content - chunk_summaries.append(f"=== Section {i} ===\n{chunk_summary}") - - logger.info(f"✅ Completed chunk {i}/{total_chunks}") - - except Exception as e: - logger.error(f"❌ Failed to process chunk {i}/{total_chunks}: {e}") - chunk_summaries.append(f"=== Section {i} ===\n[Processing failed]") - - # Combine summaries into final document summary - logger.info(f"🔄 Combining {len(chunk_summaries)} chunk summaries") - - try: - combine_template = PromptTemplate( - input_variables=["summaries", "document_title"], - template=""" -You are combining multiple section summaries into a final comprehensive document summary. - -Create a unified, coherent summary from the following section summaries of "{document_title}". -Ensure: -- Logical flow and organization -- No redundancy or repetition -- Comprehensive coverage of all key points -- Professional, objective tone - - -{summaries} - -""", - ) - - combined_summaries = "\n\n".join(chunk_summaries) - combine_chain = combine_template | llm - - final_result = await combine_chain.ainvoke( - {"summaries": combined_summaries, "document_title": document_title} - ) - - final_summary = final_result.content - logger.info( - f"✅ Large document processing complete: {len(final_summary)} chars summary" - ) - - return final_summary - - except Exception as e: - logger.error(f"❌ Failed to combine summaries: {e}") - # Fallback: return concatenated chunk summaries - fallback_summary = "\n\n".join(chunk_summaries) - logger.warning("⚠️ Using fallback combined summary") - return fallback_summary - def create_docling_service() -> DoclingService: """Create a Docling service instance.""" diff --git a/surfsense_backend/app/services/obsidian_plugin_indexer.py b/surfsense_backend/app/services/obsidian_plugin_indexer.py index 0fc4f30f4..13f43d1ee 100644 --- a/surfsense_backend/app/services/obsidian_plugin_indexer.py +++ b/surfsense_backend/app/services/obsidian_plugin_indexer.py @@ -233,18 +233,6 @@ async def _resolve_attachment_vision_llm( return await get_vision_llm(session, search_space_id) -async def _resolve_summary_llm( - session: AsyncSession, *, user_id: str, search_space_id: int, should_summarize: bool -): - """Fetch summary LLM only when indexing summary is enabled.""" - if not should_summarize: - return None - - from app.services.llm_service import get_user_long_context_llm - - return await get_user_long_context_llm(session, user_id, search_space_id) - - def _require_extracted_attachment_content( *, content: str, etl_meta: dict[str, Any], path: str ) -> str: @@ -349,13 +337,6 @@ async def upsert_note( path=payload.path, ) - llm = await _resolve_summary_llm( - session, - user_id=str(user_id), - search_space_id=search_space_id, - should_summarize=connector.enable_summary, - ) - document_string = _build_document_string( payload, vault_name, content_override=content_for_index ) @@ -374,8 +355,6 @@ async def upsert_note( search_space_id=search_space_id, connector_id=connector.id, created_by_id=str(user_id), - should_summarize=connector.enable_summary, - fallback_summary=f"Obsidian Note: {payload.name}\n\n{content_for_index}", metadata=metadata, ) @@ -388,7 +367,7 @@ async def upsert_note( document = prepared[0] - return await pipeline.index(document, connector_doc, llm) + return await pipeline.index(document, connector_doc) async def rename_note( diff --git a/surfsense_backend/app/services/task_dispatcher.py b/surfsense_backend/app/services/task_dispatcher.py index 210084102..43957be03 100644 --- a/surfsense_backend/app/services/task_dispatcher.py +++ b/surfsense_backend/app/services/task_dispatcher.py @@ -18,7 +18,6 @@ class TaskDispatcher(Protocol): filename: str, search_space_id: int, user_id: str, - should_summarize: bool = False, use_vision_llm: bool = False, processing_mode: str = "basic", ) -> None: ... @@ -35,7 +34,6 @@ class CeleryTaskDispatcher: filename: str, search_space_id: int, user_id: str, - should_summarize: bool = False, use_vision_llm: bool = False, processing_mode: str = "basic", ) -> None: @@ -49,7 +47,6 @@ class CeleryTaskDispatcher: filename=filename, search_space_id=search_space_id, user_id=user_id, - should_summarize=should_summarize, use_vision_llm=use_vision_llm, processing_mode=processing_mode, ) diff --git a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py index 5d6bde6c1..d36a7c05f 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_reindex_tasks.py @@ -9,7 +9,6 @@ from sqlalchemy.orm import selectinload from app.celery_app import celery_app from app.db import Document from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter -from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task @@ -68,12 +67,8 @@ async def _reindex_document(document_id: int, user_id: str): logger.info(f"Reindexing document {document_id} ({document.title})") - user_llm = await get_user_long_context_llm( - session, user_id, document.search_space_id - ) - adapter = UploadDocumentAdapter(session) - await adapter.reindex(document=document, llm=user_llm) + await adapter.reindex(document=document) await task_logger.log_task_success( log_entry, diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 4781ca6a5..5b7b62f35 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -765,7 +765,6 @@ def process_file_upload_with_document_task( filename: str, search_space_id: int, user_id: str, - should_summarize: bool = False, use_vision_llm: bool = False, processing_mode: str = "basic", ): @@ -782,7 +781,6 @@ def process_file_upload_with_document_task( filename: Original filename search_space_id: ID of the search space user_id: ID of the user - should_summarize: Whether to generate an LLM summary """ import traceback @@ -814,7 +812,6 @@ def process_file_upload_with_document_task( filename, search_space_id, user_id, - should_summarize=should_summarize, use_vision_llm=use_vision_llm, processing_mode=processing_mode, ) @@ -850,7 +847,6 @@ async def _process_file_with_document( filename: str, search_space_id: int, user_id: str, - should_summarize: bool = False, use_vision_llm: bool = False, processing_mode: str = "basic", ): @@ -954,7 +950,6 @@ async def _process_file_with_document( task_logger=task_logger, log_entry=log_entry, notification=notification, - should_summarize=should_summarize, use_vision_llm=use_vision_llm, processing_mode=processing_mode, ) @@ -1258,7 +1253,6 @@ def index_local_folder_task( exclude_patterns: list[str] | None = None, file_extensions: list[str] | None = None, root_folder_id: int | None = None, - enable_summary: bool = False, target_file_paths: list[str] | None = None, ): """Celery task to index a local folder. Config is passed directly — no connector row.""" @@ -1271,7 +1265,6 @@ def index_local_folder_task( exclude_patterns=exclude_patterns, file_extensions=file_extensions, root_folder_id=root_folder_id, - enable_summary=enable_summary, target_file_paths=target_file_paths, ) ) @@ -1285,7 +1278,6 @@ async def _index_local_folder_async( exclude_patterns: list[str] | None = None, file_extensions: list[str] | None = None, root_folder_id: int | None = None, - enable_summary: bool = False, target_file_paths: list[str] | None = None, ): """Run local folder indexing with notification + heartbeat.""" @@ -1343,8 +1335,7 @@ async def _index_local_folder_async( exclude_patterns=exclude_patterns, file_extensions=file_extensions, root_folder_id=root_folder_id, - enable_summary=enable_summary, - target_file_paths=target_file_paths, + target_file_paths=target_file_paths, on_heartbeat_callback=_heartbeat_progress if (is_batch or is_full_scan) else None, @@ -1400,7 +1391,6 @@ def index_uploaded_folder_files_task( user_id: str, folder_name: str, root_folder_id: int, - enable_summary: bool, file_mappings: list[dict], use_vision_llm: bool = False, processing_mode: str = "basic", @@ -1412,7 +1402,6 @@ def index_uploaded_folder_files_task( user_id=user_id, folder_name=folder_name, root_folder_id=root_folder_id, - enable_summary=enable_summary, file_mappings=file_mappings, use_vision_llm=use_vision_llm, processing_mode=processing_mode, @@ -1425,7 +1414,6 @@ async def _index_uploaded_folder_files_async( user_id: str, folder_name: str, root_folder_id: int, - enable_summary: bool, file_mappings: list[dict], use_vision_llm: bool = False, processing_mode: str = "basic", @@ -1475,8 +1463,7 @@ async def _index_uploaded_folder_files_async( user_id=user_id, folder_name=folder_name, root_folder_id=root_folder_id, - enable_summary=enable_summary, - file_mappings=file_mappings, + file_mappings=file_mappings, on_heartbeat_callback=_heartbeat_progress, use_vision_llm=use_vision_llm, processing_mode=processing_mode, @@ -1563,12 +1550,10 @@ async def _ai_sort_search_space_async(search_space_id: int, user_id: str): t_start = time.perf_counter() try: from app.services.ai_file_sort_service import ai_sort_all_documents - from app.services.llm_service import get_document_summary_llm + from app.services.llm_service import get_agent_llm async with get_celery_session_maker()() as session: - llm = await get_document_summary_llm( - session, search_space_id, disable_streaming=True - ) + llm = await get_agent_llm(session, search_space_id, disable_streaming=True) if llm is None: logger.warning( "No LLM configured for search_space=%d, skipping AI sort", @@ -1604,7 +1589,7 @@ def ai_sort_document_task(self, search_space_id: int, user_id: str, document_id: async def _ai_sort_document_async(search_space_id: int, user_id: str, document_id: int): from app.db import Document from app.services.ai_file_sort_service import ai_sort_document - from app.services.llm_service import get_document_summary_llm + from app.services.llm_service import get_agent_llm async with get_celery_session_maker()() as session: document = await session.get(Document, document_id) @@ -1612,9 +1597,7 @@ async def _ai_sort_document_async(search_space_id: int, user_id: str, document_i logger.warning("Document %d not found, skipping AI sort", document_id) return - llm = await get_document_summary_llm( - session, search_space_id, disable_streaming=True - ) + llm = await get_agent_llm(session, search_space_id, disable_streaming=True) if llm is None: logger.warning( "No LLM for search_space=%d, skipping AI sort of doc=%d", diff --git a/surfsense_backend/app/tasks/document_processors/_save.py b/surfsense_backend/app/tasks/document_processors/_save.py index d633dd4f6..3b9616cbd 100644 --- a/surfsense_backend/app/tasks/document_processors/_save.py +++ b/surfsense_backend/app/tasks/document_processors/_save.py @@ -1,20 +1,15 @@ -""" -Unified document save/update logic for file processors. -""" +"""Unified document save/update logic for file processors.""" -import asyncio import logging from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentStatus, DocumentType -from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import ( create_document_chunks, embed_text, generate_content_hash, - generate_document_summary, ) from ._helpers import ( @@ -24,59 +19,6 @@ from ._helpers import ( ) from .base import get_current_timestamp, safe_set_chunks -# --------------------------------------------------------------------------- -# Summary generation -# --------------------------------------------------------------------------- - - -async def _generate_summary( - markdown_content: str, - file_name: str, - etl_service: str, - user_llm, - enable_summary: bool, -) -> tuple[str, list[float]]: - """ - Generate a document summary and embedding. - - Docling uses its own large-document summary strategy; other ETL services - use the standard ``generate_document_summary`` helper. - """ - if not enable_summary: - summary = f"File: {file_name}\n\n{markdown_content[:4000]}" - return summary, await asyncio.to_thread(embed_text, summary) - - if etl_service == "DOCLING": - from app.services.docling_service import create_docling_service - - docling_service = create_docling_service() - summary_text = await docling_service.process_large_document_summary( - content=markdown_content, llm=user_llm, document_title=file_name - ) - - meta = { - "file_name": file_name, - "etl_service": etl_service, - "document_type": "File Document", - } - parts = ["# DOCUMENT METADATA"] - for key, value in meta.items(): - if value: - formatted_key = key.replace("_", " ").title() - parts.append(f"**{formatted_key}:** {value}") - - enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text - return enhanced, await asyncio.to_thread(embed_text, enhanced) - - # Standard summary (Unstructured / LlamaCloud / others) - meta = { - "file_name": file_name, - "etl_service": etl_service, - "document_type": "File Document", - } - return await generate_document_summary(markdown_content, user_llm, meta) - - # --------------------------------------------------------------------------- # Unified save function # --------------------------------------------------------------------------- @@ -90,7 +32,6 @@ async def save_file_document( user_id: str, etl_service: str, connector: dict | None = None, - enable_summary: bool = True, ) -> Document | None: """ Process and store a file document with deduplication and migration support. @@ -106,7 +47,6 @@ async def save_file_document( user_id: ID of the user etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING) connector: Optional connector info for Google Drive files - enable_summary: Whether to generate an AI summary Returns: Document object if successful, None if duplicate detected @@ -133,24 +73,16 @@ async def save_file_document( if should_skip: return doc - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} " - f"in search space {search_space_id}" - ) - - summary_content, summary_embedding = await _generate_summary( - markdown_content, file_name, etl_service, user_llm, enable_summary - ) + document_content = f"File: {file_name}\n\n{markdown_content[:4000]}" + document_embedding = embed_text(document_content) chunks = await create_document_chunks(markdown_content) doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service} if existing_document: existing_document.title = file_name - existing_document.content = summary_content + existing_document.content = document_content existing_document.content_hash = content_hash - existing_document.embedding = summary_embedding + existing_document.embedding = document_embedding existing_document.document_metadata = doc_metadata await safe_set_chunks(session, existing_document, chunks) existing_document.source_markdown = markdown_content @@ -171,8 +103,8 @@ async def save_file_document( title=file_name, document_type=doc_type, document_metadata=doc_metadata, - content=summary_content, - embedding=summary_embedding, + content=document_content, + embedding=document_embedding, chunks=chunks, content_hash=content_hash, unique_identifier_hash=primary_hash, diff --git a/surfsense_backend/app/tasks/document_processors/circleback_processor.py b/surfsense_backend/app/tasks/document_processors/circleback_processor.py index a6b9568b9..ee36d5bc2 100644 --- a/surfsense_backend/app/tasks/document_processors/circleback_processor.py +++ b/surfsense_backend/app/tasks/document_processors/circleback_processor.py @@ -25,11 +25,10 @@ from app.db import ( SearchSourceConnectorType, SearchSpace, ) -from app.services.llm_service import get_document_summary_llm from app.utils.document_converters import ( create_document_chunks, + embed_text, generate_content_hash, - generate_document_summary, generate_unique_identifier_hash, ) @@ -176,34 +175,8 @@ async def add_circleback_meeting_document( # PHASE 3: Process the document content # ======================================================================= - # Get LLM for generating summary - llm = await get_document_summary_llm(session, search_space_id) - if not llm: - logger.warning( - f"No LLM configured for search space {search_space_id}. Using content as summary." - ) - # Use first 1000 chars as summary if no LLM available - summary_content = ( - markdown_content[:1000] + "..." - if len(markdown_content) > 1000 - else markdown_content - ) - summary_embedding = None - else: - # Generate summary with metadata - summary_metadata = { - "meeting_name": meeting_name, - "meeting_id": meeting_id, - "document_type": "Circleback Meeting", - **{ - k: v - for k, v in metadata.items() - if isinstance(v, str | int | float | bool) - }, - } - summary_content, summary_embedding = await generate_document_summary( - markdown_content, llm, summary_metadata - ) + summary_content = markdown_content + summary_embedding = embed_text(summary_content) # Process chunks chunks = await create_document_chunks(markdown_content) @@ -224,8 +197,7 @@ async def add_circleback_meeting_document( document.title = meeting_name document.content = summary_content document.content_hash = content_hash - if summary_embedding is not None: - document.embedding = summary_embedding + document.embedding = summary_embedding document.document_metadata = document_metadata await safe_set_chunks(session, document, chunks) document.source_markdown = markdown_content diff --git a/surfsense_backend/app/tasks/document_processors/extension_processor.py b/surfsense_backend/app/tasks/document_processors/extension_processor.py index 7320ec9fa..bdbc985fa 100644 --- a/surfsense_backend/app/tasks/document_processors/extension_processor.py +++ b/surfsense_backend/app/tasks/document_processors/extension_processor.py @@ -9,12 +9,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentType from app.schemas import ExtensionDocumentContent -from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, + embed_text, generate_content_hash, - generate_document_summary, generate_unique_identifier_hash, ) @@ -123,26 +122,8 @@ async def add_extension_received_document( f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document." ) - # Get user's long context LLM (needed for both create and update) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" - ) - - # Generate summary with metadata - document_metadata = { - "session_id": content.metadata.BrowsingSessionId, - "url": content.metadata.VisitedWebPageURL, - "title": content.metadata.VisitedWebPageTitle, - "referrer": content.metadata.VisitedWebPageReffererURL, - "timestamp": content.metadata.VisitedWebPageDateWithTimeInISOString, - "duration_ms": content.metadata.VisitedWebPageVisitDurationInMilliseconds, - "document_type": "Browser Extension Capture", - } - summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm, document_metadata - ) + summary_content = combined_document_string + summary_embedding = embed_text(summary_content) # Process chunks chunks = await create_document_chunks(content.pageContent) diff --git a/surfsense_backend/app/tasks/document_processors/file_processors.py b/surfsense_backend/app/tasks/document_processors/file_processors.py index 137c27cda..805f5554d 100644 --- a/surfsense_backend/app/tasks/document_processors/file_processors.py +++ b/surfsense_backend/app/tasks/document_processors/file_processors.py @@ -10,7 +10,7 @@ from __future__ import annotations import contextlib import logging import os -from dataclasses import dataclass, field +from dataclasses import dataclass from fastapi import HTTPException from sqlalchemy.ext.asyncio import AsyncSession @@ -48,12 +48,6 @@ class _ProcessingContext: notification: Notification | None = None use_vision_llm: bool = False processing_mode: str = "basic" - enable_summary: bool = field(init=False) - - def __post_init__(self) -> None: - self.enable_summary = ( - self.connector.get("enable_summary", True) if self.connector else True - ) # --------------------------------------------------------------------------- @@ -261,7 +255,6 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None: ctx.user_id, etl_result.etl_service, ctx.connector, - enable_summary=ctx.enable_summary, ) if result: @@ -466,7 +459,6 @@ async def process_file_in_background_with_document( log_entry: Log, connector: dict | None = None, notification: Notification | None = None, - should_summarize: bool = False, use_vision_llm: bool = False, processing_mode: str = "basic", ) -> Document | None: @@ -482,7 +474,6 @@ async def process_file_in_background_with_document( from app.indexing_pipeline.adapters.file_upload_adapter import ( UploadDocumentAdapter, ) - from app.services.llm_service import get_user_long_context_llm from app.utils.document_converters import generate_content_hash from .base import check_duplicate_document @@ -522,8 +513,6 @@ async def process_file_in_background_with_document( stage="chunking", ) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - adapter = UploadDocumentAdapter(session) await adapter.index( markdown_content=markdown_content, @@ -531,8 +520,6 @@ async def process_file_in_background_with_document( etl_service=etl_service, search_space_id=search_space_id, user_id=user_id, - llm=user_llm, - should_summarize=should_summarize, ) if billable_pages > 0: diff --git a/surfsense_backend/app/tasks/document_processors/markdown_processor.py b/surfsense_backend/app/tasks/document_processors/markdown_processor.py index 0ff340c0e..19a4df87d 100644 --- a/surfsense_backend/app/tasks/document_processors/markdown_processor.py +++ b/surfsense_backend/app/tasks/document_processors/markdown_processor.py @@ -8,12 +8,11 @@ from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.db import Document, DocumentStatus, DocumentType -from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, + embed_text, generate_content_hash, - generate_document_summary, ) from ._helpers import ( @@ -183,21 +182,8 @@ async def add_received_markdown_file_document( return doc # Content changed - continue to update - # Get user's long context LLM (needed for both create and update) - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" - ) - - # Generate summary with metadata - document_metadata = { - "file_name": file_name, - "document_type": "Markdown File Document", - } - summary_content, summary_embedding = await generate_document_summary( - file_in_markdown, user_llm, document_metadata - ) + summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}" + summary_embedding = embed_text(summary_content) # Process chunks chunks = await create_document_chunks(file_in_markdown) diff --git a/surfsense_backend/app/tasks/document_processors/youtube_processor.py b/surfsense_backend/app/tasks/document_processors/youtube_processor.py index 0ed2e57d2..96c7bda5f 100644 --- a/surfsense_backend/app/tasks/document_processors/youtube_processor.py +++ b/surfsense_backend/app/tasks/document_processors/youtube_processor.py @@ -17,12 +17,11 @@ from sqlalchemy.ext.asyncio import AsyncSession from youtube_transcript_api import YouTubeTranscriptApi from app.db import Document, DocumentStatus, DocumentType -from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, + embed_text, generate_content_hash, - generate_document_summary, generate_unique_identifier_hash, ) from app.utils.proxy_config import get_requests_proxies @@ -355,40 +354,8 @@ async def add_youtube_video_document( await session.commit() return document - # Get LLM for summary generation - await task_logger.log_task_progress( - log_entry, - f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}", - {"stage": "llm_setup"}, - ) - - # Get user's long context LLM - user_llm = await get_user_long_context_llm(session, user_id, search_space_id) - if not user_llm: - raise RuntimeError( - f"No long context LLM configured for user {user_id} in search space {search_space_id}" - ) - - # Generate summary - await task_logger.log_task_progress( - log_entry, - f"Generating summary for video: {video_data.get('title', 'YouTube Video')}", - {"stage": "summary_generation"}, - ) - - # Generate summary with metadata - document_metadata_for_summary = { - "url": url, - "video_id": video_id, - "title": video_data.get("title", "YouTube Video"), - "author": video_data.get("author_name", "Unknown"), - "thumbnail": video_data.get("thumbnail_url", ""), - "document_type": "YouTube Video Document", - "has_transcript": "No captions available" not in transcript_text, - } - summary_content, summary_embedding = await generate_document_summary( - combined_document_string, user_llm, document_metadata_for_summary - ) + summary_content = combined_document_string + summary_embedding = embed_text(summary_content) # Process chunks await task_logger.log_task_progress( diff --git a/surfsense_backend/app/utils/document_converters.py b/surfsense_backend/app/utils/document_converters.py index 059d91806..694ae22ac 100644 --- a/surfsense_backend/app/utils/document_converters.py +++ b/surfsense_backend/app/utils/document_converters.py @@ -9,7 +9,6 @@ from litellm import get_model_info, token_counter from app.config import config from app.db import Chunk, DocumentType -from app.prompts import SUMMARY_PROMPT_TEMPLATE logger = logging.getLogger(__name__) @@ -176,57 +175,6 @@ def optimize_content_for_context_window( return optimized_content -async def generate_document_summary( - content: str, - user_llm, - document_metadata: dict | None = None, -) -> tuple[str, list[float]]: - """ - Generate summary and embedding for document content with metadata. - - Args: - content: Document content - user_llm: User's LLM instance - document_metadata: Optional metadata dictionary to include in summary - - Returns: - Tuple of (enhanced_summary_content, summary_embedding) - """ - # Get model name from user_llm for token counting - model_name = getattr(user_llm, "model", "gpt-3.5-turbo") # Fallback to default - - # Optimize content to fit within context window - optimized_content = optimize_content_for_context_window( - content, document_metadata, model_name - ) - - summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm - content_with_metadata = f"\n\n{document_metadata}\n\n\n\n\n\n{optimized_content}\n\n" - summary_result = await summary_chain.ainvoke({"document": content_with_metadata}) - summary_content = summary_result.content - - # Combine summary with metadata if provided - if document_metadata: - metadata_parts = [] - metadata_parts.append("# DOCUMENT METADATA") - - for key, value in document_metadata.items(): - if value: # Only include non-empty values - formatted_key = key.replace("_", " ").title() - metadata_parts.append(f"**{formatted_key}:** {value}") - - metadata_section = "\n".join(metadata_parts) - enhanced_summary_content = ( - f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}" - ) - else: - enhanced_summary_content = summary_content - - summary_embedding = await asyncio.to_thread(embed_text, enhanced_summary_content) - - return enhanced_summary_content, summary_embedding - - async def create_document_chunks(content: str) -> list[Chunk]: """ Create chunks from document content.