feat(backend): Remove LLM summaries from document indexing

This commit is contained in:
Anish Sarkar 2026-06-04 00:50:19 +05:30
parent 290a9539ef
commit 81fa219b30
17 changed files with 40 additions and 518 deletions

View file

@ -18,8 +18,6 @@ class UploadDocumentAdapter:
etl_service: str, etl_service: str,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
llm,
should_summarize: bool = False,
) -> None: ) -> None:
connector_doc = ConnectorDocument( connector_doc = ConnectorDocument(
title=filename, title=filename,
@ -29,9 +27,7 @@ class UploadDocumentAdapter:
search_space_id=search_space_id, search_space_id=search_space_id,
created_by_id=user_id, created_by_id=user_id,
connector_id=None, connector_id=None,
should_summarize=should_summarize,
should_use_code_chunker=False, should_use_code_chunker=False,
fallback_summary=markdown_content[:4000],
metadata={ metadata={
"FILE_NAME": filename, "FILE_NAME": filename,
"ETL_SERVICE": etl_service, "ETL_SERVICE": etl_service,
@ -43,7 +39,7 @@ class UploadDocumentAdapter:
if not documents: if not documents:
raise RuntimeError("prepare_for_indexing returned no documents") raise RuntimeError("prepare_for_indexing returned no documents")
indexed = await self._service.index(documents[0], connector_doc, llm) indexed = await self._service.index(documents[0], connector_doc)
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
raise RuntimeError(indexed.status.get("reason", "Indexing failed")) raise RuntimeError(indexed.status.get("reason", "Indexing failed"))
@ -51,7 +47,7 @@ class UploadDocumentAdapter:
indexed.content_needs_reindexing = False indexed.content_needs_reindexing = False
await self._session.commit() await self._session.commit()
async def reindex(self, document: Document, llm) -> None: async def reindex(self, document: Document) -> None:
"""Re-index an existing document after its source_markdown has been updated.""" """Re-index an existing document after its source_markdown has been updated."""
if not document.source_markdown: if not document.source_markdown:
raise RuntimeError("Document has no source_markdown to reindex") raise RuntimeError("Document has no source_markdown to reindex")
@ -66,15 +62,13 @@ class UploadDocumentAdapter:
search_space_id=document.search_space_id, search_space_id=document.search_space_id,
created_by_id=str(document.created_by_id), created_by_id=str(document.created_by_id),
connector_id=document.connector_id, connector_id=document.connector_id,
should_summarize=True,
should_use_code_chunker=False, should_use_code_chunker=False,
fallback_summary=document.source_markdown[:4000],
metadata=metadata, metadata=metadata,
) )
document.content_hash = compute_content_hash(connector_doc) document.content_hash = compute_content_hash(connector_doc)
indexed = await self._service.index(document, connector_doc, llm) indexed = await self._service.index(document, connector_doc)
if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY): if not DocumentStatus.is_state(indexed.status, DocumentStatus.READY):
raise RuntimeError(indexed.status.get("reason", "Reindexing failed")) raise RuntimeError(indexed.status.get("reason", "Reindexing failed"))

View file

@ -11,9 +11,7 @@ class ConnectorDocument(BaseModel):
unique_id: str unique_id: str
document_type: DocumentType document_type: DocumentType
search_space_id: int = Field(gt=0) search_space_id: int = Field(gt=0)
should_summarize: bool = True
should_use_code_chunker: bool = False should_use_code_chunker: bool = False
fallback_summary: str | None = None
metadata: dict = {} metadata: dict = {}
connector_id: int | None = None connector_id: int | None = None
created_by_id: str created_by_id: str

View file

@ -1,30 +0,0 @@
from app.prompts import SUMMARY_PROMPT_TEMPLATE
from app.utils.document_converters import optimize_content_for_context_window
async def summarize_document(
source_markdown: str, llm, metadata: dict | None = None
) -> str:
"""Generate a text summary of a document using an LLM, prefixed with metadata when provided."""
model_name = getattr(llm, "model", "gpt-3.5-turbo")
optimized_content = optimize_content_for_context_window(
source_markdown, metadata, model_name
)
summary_chain = SUMMARY_PROMPT_TEMPLATE | llm
content_with_metadata = (
f"<DOCUMENT><DOCUMENT_METADATA>\n\n{metadata}\n\n</DOCUMENT_METADATA>"
f"\n\n<DOCUMENT_CONTENT>\n\n{optimized_content}\n\n</DOCUMENT_CONTENT></DOCUMENT>"
)
summary_result = await summary_chain.ainvoke({"document": content_with_metadata})
summary_content = summary_result.content
if metadata:
metadata_parts = ["# DOCUMENT METADATA"]
for key, value in metadata.items():
if value:
metadata_parts.append(f"**{key.replace('_', ' ').title()}:** {value}")
metadata_section = "\n".join(metadata_parts)
return f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
return summary_content

View file

@ -31,7 +31,6 @@ from app.indexing_pipeline.document_persistence import (
attach_chunks_to_document, attach_chunks_to_document,
rollback_and_persist_failure, rollback_and_persist_failure,
) )
from app.indexing_pipeline.document_summarizer import summarize_document
from app.indexing_pipeline.exceptions import ( from app.indexing_pipeline.exceptions import (
EMBEDDING_ERRORS, EMBEDDING_ERRORS,
PERMANENT_LLM_ERRORS, PERMANENT_LLM_ERRORS,
@ -203,9 +202,7 @@ class IndexingPipelineService:
await self.session.commit() await self.session.commit()
async def index_batch( async def index_batch(self, connector_docs: list[ConnectorDocument]) -> list[Document]:
self, connector_docs: list[ConnectorDocument], llm
) -> list[Document]:
"""Convenience method: prepare_for_indexing then index each document. """Convenience method: prepare_for_indexing then index each document.
Indexers that need heartbeat callbacks or custom per-document logic Indexers that need heartbeat callbacks or custom per-document logic
@ -218,7 +215,7 @@ class IndexingPipelineService:
connector_doc = doc_map.get(document.unique_identifier_hash) connector_doc = doc_map.get(document.unique_identifier_hash)
if connector_doc is None: if connector_doc is None:
continue continue
result = await self.index(document, connector_doc, llm) result = await self.index(document, connector_doc)
results.append(result) results.append(result)
return results return results
@ -350,11 +347,9 @@ class IndexingPipelineService:
await self.session.rollback() await self.session.rollback()
return [] return []
async def index( async def index(self, document: Document, connector_doc: ConnectorDocument) -> Document:
self, document: Document, connector_doc: ConnectorDocument, llm
) -> Document:
""" """
Run summarization, embedding, and chunking for a document and persist the results. Run deterministic content storage, embedding, and chunking for a document.
""" """
ctx = PipelineLogContext( ctx = PipelineLogContext(
connector_id=connector_doc.connector_id, connector_id=connector_doc.connector_id,
@ -379,20 +374,7 @@ class IndexingPipelineService:
document.status = DocumentStatus.processing() document.status = DocumentStatus.processing()
await self.session.commit() await self.session.commit()
t_step = time.perf_counter() content = connector_doc.source_markdown
if connector_doc.should_summarize and llm is not None:
content = await summarize_document(
connector_doc.source_markdown, llm, connector_doc.metadata
)
perf.info(
"[indexing] summarize_document doc=%d in %.3fs",
document.id,
time.perf_counter() - t_step,
)
elif connector_doc.should_summarize and connector_doc.fallback_summary:
content = connector_doc.fallback_summary
else:
content = connector_doc.source_markdown
await self.session.execute( await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id) delete(Chunk).where(Chunk.document_id == document.id)
@ -523,7 +505,6 @@ class IndexingPipelineService:
async def index_batch_parallel( async def index_batch_parallel(
self, self,
connector_docs: list[ConnectorDocument], connector_docs: list[ConnectorDocument],
get_llm: Callable[[AsyncSession], Awaitable],
*, *,
max_concurrency: int = 4, max_concurrency: int = 4,
on_heartbeat: Callable[[int], Awaitable[None]] | None = None, on_heartbeat: Callable[[int], Awaitable[None]] | None = None,
@ -532,8 +513,8 @@ class IndexingPipelineService:
"""Index documents in parallel with bounded concurrency. """Index documents in parallel with bounded concurrency.
Phase 1 (serial): prepare_for_indexing using self.session. Phase 1 (serial): prepare_for_indexing using self.session.
Phase 2 (parallel): index each document in an isolated session, Phase 2 (parallel): index each document in an isolated session, bounded
bounded by a semaphore to avoid overwhelming APIs/DB. by a semaphore to avoid overwhelming embedding APIs/DB.
""" """
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
perf = get_perf_logger() perf = get_perf_logger()
@ -577,9 +558,8 @@ class IndexingPipelineService:
failed_count += 1 failed_count += 1
return document return document
llm = await get_llm(isolated_session)
iso_pipeline = IndexingPipelineService(isolated_session) iso_pipeline = IndexingPipelineService(isolated_session)
result = await iso_pipeline.index(refetched, connector_doc, llm) result = await iso_pipeline.index(refetched, connector_doc)
async with lock: async with lock:
if DocumentStatus.is_state( if DocumentStatus.is_state(

View file

@ -125,7 +125,6 @@ async def create_documents(
async def create_documents_file_upload( async def create_documents_file_upload(
files: list[UploadFile], files: list[UploadFile],
search_space_id: int = Form(...), search_space_id: int = Form(...),
should_summarize: bool = Form(False),
use_vision_llm: bool = Form(False), use_vision_llm: bool = Form(False),
processing_mode: str = Form("basic"), processing_mode: str = Form("basic"),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
@ -309,7 +308,6 @@ async def create_documents_file_upload(
filename=filename, filename=filename,
search_space_id=search_space_id, search_space_id=search_space_id,
user_id=str(user.id), user_id=str(user.id),
should_summarize=should_summarize,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=validated_mode.value, processing_mode=validated_mode.value,
) )
@ -1586,7 +1584,6 @@ async def folder_upload(
search_space_id: int = Form(...), search_space_id: int = Form(...),
relative_paths: str = Form(...), relative_paths: str = Form(...),
root_folder_id: int | None = Form(None), root_folder_id: int | None = Form(None),
enable_summary: bool = Form(False),
use_vision_llm: bool = Form(False), use_vision_llm: bool = Form(False),
processing_mode: str = Form("basic"), processing_mode: str = Form("basic"),
session: AsyncSession = Depends(get_async_session), session: AsyncSession = Depends(get_async_session),
@ -1719,7 +1716,6 @@ async def folder_upload(
user_id=str(user.id), user_id=str(user.id),
folder_name=folder_name, folder_name=folder_name,
root_folder_id=root_folder_id, root_folder_id=root_folder_id,
enable_summary=enable_summary,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
file_mappings=list(file_mappings), file_mappings=list(file_mappings),
processing_mode=validated_mode.value, processing_mode=validated_mode.value,

View file

@ -191,149 +191,6 @@ class DoclingService:
logger.error(f"Full traceback: {traceback.format_exc()}") logger.error(f"Full traceback: {traceback.format_exc()}")
raise RuntimeError(f"Docling processing failed: {e}") from e raise RuntimeError(f"Docling processing failed: {e}") from e
async def process_large_document_summary(
self, content: str, llm, document_title: str = "Document"
) -> str:
"""
Process large documents using chunked LLM summarization.
Args:
content: The full document content
llm: The language model to use for summarization
document_title: Title of the document for context
Returns:
Final summary of the document
"""
# Large document threshold (100K characters ≈ 25K tokens)
large_document_threshold = 100_000
if len(content) <= large_document_threshold:
# For smaller documents, use direct processing
logger.info(
f"📄 Document size: {len(content)} chars - using direct processing"
)
from app.prompts import SUMMARY_PROMPT_TEMPLATE
summary_chain = SUMMARY_PROMPT_TEMPLATE | llm
result = await summary_chain.ainvoke({"document": content})
return result.content
logger.info(
f"📚 Large document detected: {len(content)} chars - using chunked processing"
)
# Import chunker from config
# Create LLM-optimized chunks (8K tokens max for safety)
from chonkie import OverlapRefinery, RecursiveChunker
from langchain_core.prompts import PromptTemplate
llm_chunker = RecursiveChunker(
chunk_size=8000 # Conservative for most LLMs
)
# Apply overlap refinery for context preservation (10% overlap = 800 tokens)
overlap_refinery = OverlapRefinery(
context_size=0.1, # 10% overlap for context preservation
method="suffix", # Add next chunk context to current chunk
)
# First chunk the content, then apply overlap refinery
initial_chunks = llm_chunker.chunk(content)
chunks = overlap_refinery.refine(initial_chunks)
total_chunks = len(chunks)
logger.info(f"📄 Split into {total_chunks} chunks for LLM processing")
# Template for chunk processing
chunk_template = PromptTemplate(
input_variables=["chunk", "chunk_number", "total_chunks"],
template="""<INSTRUCTIONS>
You are summarizing chunk {chunk_number} of {total_chunks} from a large document.
Create a comprehensive summary of this document chunk. Focus on:
- Key concepts, facts, and information
- Important details and context
- Main topics and themes
Provide a clear, structured summary that captures the essential content.
Chunk {chunk_number}/{total_chunks}:
<document_chunk>
{chunk}
</document_chunk>
</INSTRUCTIONS>""",
)
# Process each chunk individually
chunk_summaries = []
for i, chunk in enumerate(chunks, 1):
try:
logger.info(
f"🔄 Processing chunk {i}/{total_chunks} ({len(chunk.text)} chars)"
)
chunk_chain = chunk_template | llm
chunk_result = await chunk_chain.ainvoke(
{
"chunk": chunk.text,
"chunk_number": i,
"total_chunks": total_chunks,
}
)
chunk_summary = chunk_result.content
chunk_summaries.append(f"=== Section {i} ===\n{chunk_summary}")
logger.info(f"✅ Completed chunk {i}/{total_chunks}")
except Exception as e:
logger.error(f"❌ Failed to process chunk {i}/{total_chunks}: {e}")
chunk_summaries.append(f"=== Section {i} ===\n[Processing failed]")
# Combine summaries into final document summary
logger.info(f"🔄 Combining {len(chunk_summaries)} chunk summaries")
try:
combine_template = PromptTemplate(
input_variables=["summaries", "document_title"],
template="""<INSTRUCTIONS>
You are combining multiple section summaries into a final comprehensive document summary.
Create a unified, coherent summary from the following section summaries of "{document_title}".
Ensure:
- Logical flow and organization
- No redundancy or repetition
- Comprehensive coverage of all key points
- Professional, objective tone
<section_summaries>
{summaries}
</section_summaries>
</INSTRUCTIONS>""",
)
combined_summaries = "\n\n".join(chunk_summaries)
combine_chain = combine_template | llm
final_result = await combine_chain.ainvoke(
{"summaries": combined_summaries, "document_title": document_title}
)
final_summary = final_result.content
logger.info(
f"✅ Large document processing complete: {len(final_summary)} chars summary"
)
return final_summary
except Exception as e:
logger.error(f"❌ Failed to combine summaries: {e}")
# Fallback: return concatenated chunk summaries
fallback_summary = "\n\n".join(chunk_summaries)
logger.warning("⚠️ Using fallback combined summary")
return fallback_summary
def create_docling_service() -> DoclingService: def create_docling_service() -> DoclingService:
"""Create a Docling service instance.""" """Create a Docling service instance."""

View file

@ -233,18 +233,6 @@ async def _resolve_attachment_vision_llm(
return await get_vision_llm(session, search_space_id) return await get_vision_llm(session, search_space_id)
async def _resolve_summary_llm(
session: AsyncSession, *, user_id: str, search_space_id: int, should_summarize: bool
):
"""Fetch summary LLM only when indexing summary is enabled."""
if not should_summarize:
return None
from app.services.llm_service import get_user_long_context_llm
return await get_user_long_context_llm(session, user_id, search_space_id)
def _require_extracted_attachment_content( def _require_extracted_attachment_content(
*, content: str, etl_meta: dict[str, Any], path: str *, content: str, etl_meta: dict[str, Any], path: str
) -> str: ) -> str:
@ -349,13 +337,6 @@ async def upsert_note(
path=payload.path, path=payload.path,
) )
llm = await _resolve_summary_llm(
session,
user_id=str(user_id),
search_space_id=search_space_id,
should_summarize=connector.enable_summary,
)
document_string = _build_document_string( document_string = _build_document_string(
payload, vault_name, content_override=content_for_index payload, vault_name, content_override=content_for_index
) )
@ -374,8 +355,6 @@ async def upsert_note(
search_space_id=search_space_id, search_space_id=search_space_id,
connector_id=connector.id, connector_id=connector.id,
created_by_id=str(user_id), created_by_id=str(user_id),
should_summarize=connector.enable_summary,
fallback_summary=f"Obsidian Note: {payload.name}\n\n{content_for_index}",
metadata=metadata, metadata=metadata,
) )
@ -388,7 +367,7 @@ async def upsert_note(
document = prepared[0] document = prepared[0]
return await pipeline.index(document, connector_doc, llm) return await pipeline.index(document, connector_doc)
async def rename_note( async def rename_note(

View file

@ -18,7 +18,6 @@ class TaskDispatcher(Protocol):
filename: str, filename: str,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
) -> None: ... ) -> None: ...
@ -35,7 +34,6 @@ class CeleryTaskDispatcher:
filename: str, filename: str,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
) -> None: ) -> None:
@ -49,7 +47,6 @@ class CeleryTaskDispatcher:
filename=filename, filename=filename,
search_space_id=search_space_id, search_space_id=search_space_id,
user_id=user_id, user_id=user_id,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=processing_mode, processing_mode=processing_mode,
) )

View file

@ -9,7 +9,6 @@ from sqlalchemy.orm import selectinload
from app.celery_app import celery_app from app.celery_app import celery_app
from app.db import Document from app.db import Document
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
@ -68,12 +67,8 @@ async def _reindex_document(document_id: int, user_id: str):
logger.info(f"Reindexing document {document_id} ({document.title})") logger.info(f"Reindexing document {document_id} ({document.title})")
user_llm = await get_user_long_context_llm(
session, user_id, document.search_space_id
)
adapter = UploadDocumentAdapter(session) adapter = UploadDocumentAdapter(session)
await adapter.reindex(document=document, llm=user_llm) await adapter.reindex(document=document)
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, log_entry,

View file

@ -765,7 +765,6 @@ def process_file_upload_with_document_task(
filename: str, filename: str,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
): ):
@ -782,7 +781,6 @@ def process_file_upload_with_document_task(
filename: Original filename filename: Original filename
search_space_id: ID of the search space search_space_id: ID of the search space
user_id: ID of the user user_id: ID of the user
should_summarize: Whether to generate an LLM summary
""" """
import traceback import traceback
@ -814,7 +812,6 @@ def process_file_upload_with_document_task(
filename, filename,
search_space_id, search_space_id,
user_id, user_id,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=processing_mode, processing_mode=processing_mode,
) )
@ -850,7 +847,6 @@ async def _process_file_with_document(
filename: str, filename: str,
search_space_id: int, search_space_id: int,
user_id: str, user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
): ):
@ -954,7 +950,6 @@ async def _process_file_with_document(
task_logger=task_logger, task_logger=task_logger,
log_entry=log_entry, log_entry=log_entry,
notification=notification, notification=notification,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=processing_mode, processing_mode=processing_mode,
) )
@ -1258,7 +1253,6 @@ def index_local_folder_task(
exclude_patterns: list[str] | None = None, exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None, file_extensions: list[str] | None = None,
root_folder_id: int | None = None, root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_paths: list[str] | None = None, target_file_paths: list[str] | None = None,
): ):
"""Celery task to index a local folder. Config is passed directly — no connector row.""" """Celery task to index a local folder. Config is passed directly — no connector row."""
@ -1271,7 +1265,6 @@ def index_local_folder_task(
exclude_patterns=exclude_patterns, exclude_patterns=exclude_patterns,
file_extensions=file_extensions, file_extensions=file_extensions,
root_folder_id=root_folder_id, root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_paths=target_file_paths, target_file_paths=target_file_paths,
) )
) )
@ -1285,7 +1278,6 @@ async def _index_local_folder_async(
exclude_patterns: list[str] | None = None, exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None, file_extensions: list[str] | None = None,
root_folder_id: int | None = None, root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_paths: list[str] | None = None, target_file_paths: list[str] | None = None,
): ):
"""Run local folder indexing with notification + heartbeat.""" """Run local folder indexing with notification + heartbeat."""
@ -1343,8 +1335,7 @@ async def _index_local_folder_async(
exclude_patterns=exclude_patterns, exclude_patterns=exclude_patterns,
file_extensions=file_extensions, file_extensions=file_extensions,
root_folder_id=root_folder_id, root_folder_id=root_folder_id,
enable_summary=enable_summary, target_file_paths=target_file_paths,
target_file_paths=target_file_paths,
on_heartbeat_callback=_heartbeat_progress on_heartbeat_callback=_heartbeat_progress
if (is_batch or is_full_scan) if (is_batch or is_full_scan)
else None, else None,
@ -1400,7 +1391,6 @@ def index_uploaded_folder_files_task(
user_id: str, user_id: str,
folder_name: str, folder_name: str,
root_folder_id: int, root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict], file_mappings: list[dict],
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
@ -1412,7 +1402,6 @@ def index_uploaded_folder_files_task(
user_id=user_id, user_id=user_id,
folder_name=folder_name, folder_name=folder_name,
root_folder_id=root_folder_id, root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings, file_mappings=file_mappings,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=processing_mode, processing_mode=processing_mode,
@ -1425,7 +1414,6 @@ async def _index_uploaded_folder_files_async(
user_id: str, user_id: str,
folder_name: str, folder_name: str,
root_folder_id: int, root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict], file_mappings: list[dict],
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
@ -1475,8 +1463,7 @@ async def _index_uploaded_folder_files_async(
user_id=user_id, user_id=user_id,
folder_name=folder_name, folder_name=folder_name,
root_folder_id=root_folder_id, root_folder_id=root_folder_id,
enable_summary=enable_summary, file_mappings=file_mappings,
file_mappings=file_mappings,
on_heartbeat_callback=_heartbeat_progress, on_heartbeat_callback=_heartbeat_progress,
use_vision_llm=use_vision_llm, use_vision_llm=use_vision_llm,
processing_mode=processing_mode, processing_mode=processing_mode,
@ -1563,12 +1550,10 @@ async def _ai_sort_search_space_async(search_space_id: int, user_id: str):
t_start = time.perf_counter() t_start = time.perf_counter()
try: try:
from app.services.ai_file_sort_service import ai_sort_all_documents from app.services.ai_file_sort_service import ai_sort_all_documents
from app.services.llm_service import get_document_summary_llm from app.services.llm_service import get_agent_llm
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
llm = await get_document_summary_llm( llm = await get_agent_llm(session, search_space_id, disable_streaming=True)
session, search_space_id, disable_streaming=True
)
if llm is None: if llm is None:
logger.warning( logger.warning(
"No LLM configured for search_space=%d, skipping AI sort", "No LLM configured for search_space=%d, skipping AI sort",
@ -1604,7 +1589,7 @@ def ai_sort_document_task(self, search_space_id: int, user_id: str, document_id:
async def _ai_sort_document_async(search_space_id: int, user_id: str, document_id: int): async def _ai_sort_document_async(search_space_id: int, user_id: str, document_id: int):
from app.db import Document from app.db import Document
from app.services.ai_file_sort_service import ai_sort_document from app.services.ai_file_sort_service import ai_sort_document
from app.services.llm_service import get_document_summary_llm from app.services.llm_service import get_agent_llm
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
document = await session.get(Document, document_id) document = await session.get(Document, document_id)
@ -1612,9 +1597,7 @@ async def _ai_sort_document_async(search_space_id: int, user_id: str, document_i
logger.warning("Document %d not found, skipping AI sort", document_id) logger.warning("Document %d not found, skipping AI sort", document_id)
return return
llm = await get_document_summary_llm( llm = await get_agent_llm(session, search_space_id, disable_streaming=True)
session, search_space_id, disable_streaming=True
)
if llm is None: if llm is None:
logger.warning( logger.warning(
"No LLM for search_space=%d, skipping AI sort of doc=%d", "No LLM for search_space=%d, skipping AI sort of doc=%d",

View file

@ -1,20 +1,15 @@
""" """Unified document save/update logic for file processors."""
Unified document save/update logic for file processors.
"""
import asyncio
import logging import logging
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
embed_text, embed_text,
generate_content_hash, generate_content_hash,
generate_document_summary,
) )
from ._helpers import ( from ._helpers import (
@ -24,59 +19,6 @@ from ._helpers import (
) )
from .base import get_current_timestamp, safe_set_chunks from .base import get_current_timestamp, safe_set_chunks
# ---------------------------------------------------------------------------
# Summary generation
# ---------------------------------------------------------------------------
async def _generate_summary(
markdown_content: str,
file_name: str,
etl_service: str,
user_llm,
enable_summary: bool,
) -> tuple[str, list[float]]:
"""
Generate a document summary and embedding.
Docling uses its own large-document summary strategy; other ETL services
use the standard ``generate_document_summary`` helper.
"""
if not enable_summary:
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
return summary, await asyncio.to_thread(embed_text, summary)
if etl_service == "DOCLING":
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
summary_text = await docling_service.process_large_document_summary(
content=markdown_content, llm=user_llm, document_title=file_name
)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
parts = ["# DOCUMENT METADATA"]
for key, value in meta.items():
if value:
formatted_key = key.replace("_", " ").title()
parts.append(f"**{formatted_key}:** {value}")
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
return enhanced, await asyncio.to_thread(embed_text, enhanced)
# Standard summary (Unstructured / LlamaCloud / others)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
return await generate_document_summary(markdown_content, user_llm, meta)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# Unified save function # Unified save function
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -90,7 +32,6 @@ async def save_file_document(
user_id: str, user_id: str,
etl_service: str, etl_service: str,
connector: dict | None = None, connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None: ) -> Document | None:
""" """
Process and store a file document with deduplication and migration support. Process and store a file document with deduplication and migration support.
@ -106,7 +47,6 @@ async def save_file_document(
user_id: ID of the user user_id: ID of the user
etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING) etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING)
connector: Optional connector info for Google Drive files connector: Optional connector info for Google Drive files
enable_summary: Whether to generate an AI summary
Returns: Returns:
Document object if successful, None if duplicate detected Document object if successful, None if duplicate detected
@ -133,24 +73,16 @@ async def save_file_document(
if should_skip: if should_skip:
return doc return doc
user_llm = await get_user_long_context_llm(session, user_id, search_space_id) document_content = f"File: {file_name}\n\n{markdown_content[:4000]}"
if not user_llm: document_embedding = embed_text(document_content)
raise RuntimeError(
f"No long context LLM configured for user {user_id} "
f"in search space {search_space_id}"
)
summary_content, summary_embedding = await _generate_summary(
markdown_content, file_name, etl_service, user_llm, enable_summary
)
chunks = await create_document_chunks(markdown_content) chunks = await create_document_chunks(markdown_content)
doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service} doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service}
if existing_document: if existing_document:
existing_document.title = file_name existing_document.title = file_name
existing_document.content = summary_content existing_document.content = document_content
existing_document.content_hash = content_hash existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding existing_document.embedding = document_embedding
existing_document.document_metadata = doc_metadata existing_document.document_metadata = doc_metadata
await safe_set_chunks(session, existing_document, chunks) await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = markdown_content existing_document.source_markdown = markdown_content
@ -171,8 +103,8 @@ async def save_file_document(
title=file_name, title=file_name,
document_type=doc_type, document_type=doc_type,
document_metadata=doc_metadata, document_metadata=doc_metadata,
content=summary_content, content=document_content,
embedding=summary_embedding, embedding=document_embedding,
chunks=chunks, chunks=chunks,
content_hash=content_hash, content_hash=content_hash,
unique_identifier_hash=primary_hash, unique_identifier_hash=primary_hash,

View file

@ -25,11 +25,10 @@ from app.db import (
SearchSourceConnectorType, SearchSourceConnectorType,
SearchSpace, SearchSpace,
) )
from app.services.llm_service import get_document_summary_llm
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
embed_text,
generate_content_hash, generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash, generate_unique_identifier_hash,
) )
@ -176,34 +175,8 @@ async def add_circleback_meeting_document(
# PHASE 3: Process the document content # PHASE 3: Process the document content
# ======================================================================= # =======================================================================
# Get LLM for generating summary summary_content = markdown_content
llm = await get_document_summary_llm(session, search_space_id) summary_embedding = embed_text(summary_content)
if not llm:
logger.warning(
f"No LLM configured for search space {search_space_id}. Using content as summary."
)
# Use first 1000 chars as summary if no LLM available
summary_content = (
markdown_content[:1000] + "..."
if len(markdown_content) > 1000
else markdown_content
)
summary_embedding = None
else:
# Generate summary with metadata
summary_metadata = {
"meeting_name": meeting_name,
"meeting_id": meeting_id,
"document_type": "Circleback Meeting",
**{
k: v
for k, v in metadata.items()
if isinstance(v, str | int | float | bool)
},
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, llm, summary_metadata
)
# Process chunks # Process chunks
chunks = await create_document_chunks(markdown_content) chunks = await create_document_chunks(markdown_content)
@ -224,8 +197,7 @@ async def add_circleback_meeting_document(
document.title = meeting_name document.title = meeting_name
document.content = summary_content document.content = summary_content
document.content_hash = content_hash document.content_hash = content_hash
if summary_embedding is not None: document.embedding = summary_embedding
document.embedding = summary_embedding
document.document_metadata = document_metadata document.document_metadata = document_metadata
await safe_set_chunks(session, document, chunks) await safe_set_chunks(session, document, chunks)
document.source_markdown = markdown_content document.source_markdown = markdown_content

View file

@ -9,12 +9,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType from app.db import Document, DocumentType
from app.schemas import ExtensionDocumentContent from app.schemas import ExtensionDocumentContent
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
embed_text,
generate_content_hash, generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash, generate_unique_identifier_hash,
) )
@ -123,26 +122,8 @@ async def add_extension_received_document(
f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document." f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document."
) )
# Get user's long context LLM (needed for both create and update) summary_content = combined_document_string
user_llm = await get_user_long_context_llm(session, user_id, search_space_id) summary_embedding = embed_text(summary_content)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary with metadata
document_metadata = {
"session_id": content.metadata.BrowsingSessionId,
"url": content.metadata.VisitedWebPageURL,
"title": content.metadata.VisitedWebPageTitle,
"referrer": content.metadata.VisitedWebPageReffererURL,
"timestamp": content.metadata.VisitedWebPageDateWithTimeInISOString,
"duration_ms": content.metadata.VisitedWebPageVisitDurationInMilliseconds,
"document_type": "Browser Extension Capture",
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata
)
# Process chunks # Process chunks
chunks = await create_document_chunks(content.pageContent) chunks = await create_document_chunks(content.pageContent)

View file

@ -10,7 +10,7 @@ from __future__ import annotations
import contextlib import contextlib
import logging import logging
import os import os
from dataclasses import dataclass, field from dataclasses import dataclass
from fastapi import HTTPException from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
@ -48,12 +48,6 @@ class _ProcessingContext:
notification: Notification | None = None notification: Notification | None = None
use_vision_llm: bool = False use_vision_llm: bool = False
processing_mode: str = "basic" processing_mode: str = "basic"
enable_summary: bool = field(init=False)
def __post_init__(self) -> None:
self.enable_summary = (
self.connector.get("enable_summary", True) if self.connector else True
)
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
@ -261,7 +255,6 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
ctx.user_id, ctx.user_id,
etl_result.etl_service, etl_result.etl_service,
ctx.connector, ctx.connector,
enable_summary=ctx.enable_summary,
) )
if result: if result:
@ -466,7 +459,6 @@ async def process_file_in_background_with_document(
log_entry: Log, log_entry: Log,
connector: dict | None = None, connector: dict | None = None,
notification: Notification | None = None, notification: Notification | None = None,
should_summarize: bool = False,
use_vision_llm: bool = False, use_vision_llm: bool = False,
processing_mode: str = "basic", processing_mode: str = "basic",
) -> Document | None: ) -> Document | None:
@ -482,7 +474,6 @@ async def process_file_in_background_with_document(
from app.indexing_pipeline.adapters.file_upload_adapter import ( from app.indexing_pipeline.adapters.file_upload_adapter import (
UploadDocumentAdapter, UploadDocumentAdapter,
) )
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import generate_content_hash from app.utils.document_converters import generate_content_hash
from .base import check_duplicate_document from .base import check_duplicate_document
@ -522,8 +513,6 @@ async def process_file_in_background_with_document(
stage="chunking", stage="chunking",
) )
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
adapter = UploadDocumentAdapter(session) adapter = UploadDocumentAdapter(session)
await adapter.index( await adapter.index(
markdown_content=markdown_content, markdown_content=markdown_content,
@ -531,8 +520,6 @@ async def process_file_in_background_with_document(
etl_service=etl_service, etl_service=etl_service,
search_space_id=search_space_id, search_space_id=search_space_id,
user_id=user_id, user_id=user_id,
llm=user_llm,
should_summarize=should_summarize,
) )
if billable_pages > 0: if billable_pages > 0:

View file

@ -8,12 +8,11 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
embed_text,
generate_content_hash, generate_content_hash,
generate_document_summary,
) )
from ._helpers import ( from ._helpers import (
@ -183,21 +182,8 @@ async def add_received_markdown_file_document(
return doc return doc
# Content changed - continue to update # Content changed - continue to update
# Get user's long context LLM (needed for both create and update) summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
user_llm = await get_user_long_context_llm(session, user_id, search_space_id) summary_embedding = embed_text(summary_content)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary with metadata
document_metadata = {
"file_name": file_name,
"document_type": "Markdown File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
# Process chunks # Process chunks
chunks = await create_document_chunks(file_in_markdown) chunks = await create_document_chunks(file_in_markdown)

View file

@ -17,12 +17,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from youtube_transcript_api import YouTubeTranscriptApi from youtube_transcript_api import YouTubeTranscriptApi
from app.db import Document, DocumentStatus, DocumentType from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
embed_text,
generate_content_hash, generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash, generate_unique_identifier_hash,
) )
from app.utils.proxy_config import get_requests_proxies from app.utils.proxy_config import get_requests_proxies
@ -355,40 +354,8 @@ async def add_youtube_video_document(
await session.commit() await session.commit()
return document return document
# Get LLM for summary generation summary_content = combined_document_string
await task_logger.log_task_progress( summary_embedding = embed_text(summary_content)
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
{"stage": "llm_setup"},
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary
await task_logger.log_task_progress(
log_entry,
f"Generating summary for video: {video_data.get('title', 'YouTube Video')}",
{"stage": "summary_generation"},
)
# Generate summary with metadata
document_metadata_for_summary = {
"url": url,
"video_id": video_id,
"title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
"document_type": "YouTube Video Document",
"has_transcript": "No captions available" not in transcript_text,
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata_for_summary
)
# Process chunks # Process chunks
await task_logger.log_task_progress( await task_logger.log_task_progress(

View file

@ -9,7 +9,6 @@ from litellm import get_model_info, token_counter
from app.config import config from app.config import config
from app.db import Chunk, DocumentType from app.db import Chunk, DocumentType
from app.prompts import SUMMARY_PROMPT_TEMPLATE
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -176,57 +175,6 @@ def optimize_content_for_context_window(
return optimized_content return optimized_content
async def generate_document_summary(
content: str,
user_llm,
document_metadata: dict | None = None,
) -> tuple[str, list[float]]:
"""
Generate summary and embedding for document content with metadata.
Args:
content: Document content
user_llm: User's LLM instance
document_metadata: Optional metadata dictionary to include in summary
Returns:
Tuple of (enhanced_summary_content, summary_embedding)
"""
# Get model name from user_llm for token counting
model_name = getattr(user_llm, "model", "gpt-3.5-turbo") # Fallback to default
# Optimize content to fit within context window
optimized_content = optimize_content_for_context_window(
content, document_metadata, model_name
)
summary_chain = SUMMARY_PROMPT_TEMPLATE | user_llm
content_with_metadata = f"<DOCUMENT><DOCUMENT_METADATA>\n\n{document_metadata}\n\n</DOCUMENT_METADATA>\n\n<DOCUMENT_CONTENT>\n\n{optimized_content}\n\n</DOCUMENT_CONTENT></DOCUMENT>"
summary_result = await summary_chain.ainvoke({"document": content_with_metadata})
summary_content = summary_result.content
# Combine summary with metadata if provided
if document_metadata:
metadata_parts = []
metadata_parts.append("# DOCUMENT METADATA")
for key, value in document_metadata.items():
if value: # Only include non-empty values
formatted_key = key.replace("_", " ").title()
metadata_parts.append(f"**{formatted_key}:** {value}")
metadata_section = "\n".join(metadata_parts)
enhanced_summary_content = (
f"{metadata_section}\n\n# DOCUMENT SUMMARY\n\n{summary_content}"
)
else:
enhanced_summary_content = summary_content
summary_embedding = await asyncio.to_thread(embed_text, enhanced_summary_content)
return enhanced_summary_content, summary_embedding
async def create_document_chunks(content: str) -> list[Chunk]: async def create_document_chunks(content: str) -> list[Chunk]:
""" """
Create chunks from document content. Create chunks from document content.