Merge upstream/dev

This commit is contained in:
CREDO23 2026-06-05 19:18:12 +02:00
commit 8bdfd00a15
191 changed files with 3301 additions and 4079 deletions

View file

@ -9,7 +9,6 @@ from sqlalchemy.orm import selectinload
from app.celery_app import celery_app
from app.db import Document
from app.indexing_pipeline.adapters.file_upload_adapter import UploadDocumentAdapter
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
@ -68,12 +67,8 @@ async def _reindex_document(document_id: int, user_id: str):
logger.info(f"Reindexing document {document_id} ({document.title})")
user_llm = await get_user_long_context_llm(
session, user_id, document.search_space_id
)
adapter = UploadDocumentAdapter(session)
await adapter.reindex(document=document, llm=user_llm)
await adapter.reindex(document=document)
await task_logger.log_task_success(
log_entry,

View file

@ -765,7 +765,6 @@ def process_file_upload_with_document_task(
filename: str,
search_space_id: int,
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
@ -782,7 +781,6 @@ def process_file_upload_with_document_task(
filename: Original filename
search_space_id: ID of the search space
user_id: ID of the user
should_summarize: Whether to generate an LLM summary
"""
import traceback
@ -814,7 +812,6 @@ def process_file_upload_with_document_task(
filename,
search_space_id,
user_id,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
@ -850,7 +847,6 @@ async def _process_file_with_document(
filename: str,
search_space_id: int,
user_id: str,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
):
@ -954,7 +950,6 @@ async def _process_file_with_document(
task_logger=task_logger,
log_entry=log_entry,
notification=notification,
should_summarize=should_summarize,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
)
@ -1258,7 +1253,6 @@ def index_local_folder_task(
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_paths: list[str] | None = None,
):
"""Celery task to index a local folder. Config is passed directly — no connector row."""
@ -1271,7 +1265,6 @@ def index_local_folder_task(
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_paths=target_file_paths,
)
)
@ -1285,7 +1278,6 @@ async def _index_local_folder_async(
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_paths: list[str] | None = None,
):
"""Run local folder indexing with notification + heartbeat."""
@ -1343,8 +1335,7 @@ async def _index_local_folder_async(
exclude_patterns=exclude_patterns,
file_extensions=file_extensions,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
target_file_paths=target_file_paths,
target_file_paths=target_file_paths,
on_heartbeat_callback=_heartbeat_progress
if (is_batch or is_full_scan)
else None,
@ -1400,7 +1391,6 @@ def index_uploaded_folder_files_task(
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
processing_mode: str = "basic",
@ -1412,7 +1402,6 @@ def index_uploaded_folder_files_task(
user_id=user_id,
folder_name=folder_name,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
@ -1425,7 +1414,6 @@ async def _index_uploaded_folder_files_async(
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
use_vision_llm: bool = False,
processing_mode: str = "basic",
@ -1475,8 +1463,7 @@ async def _index_uploaded_folder_files_async(
user_id=user_id,
folder_name=folder_name,
root_folder_id=root_folder_id,
enable_summary=enable_summary,
file_mappings=file_mappings,
file_mappings=file_mappings,
on_heartbeat_callback=_heartbeat_progress,
use_vision_llm=use_vision_llm,
processing_mode=processing_mode,
@ -1563,12 +1550,10 @@ async def _ai_sort_search_space_async(search_space_id: int, user_id: str):
t_start = time.perf_counter()
try:
from app.services.ai_file_sort_service import ai_sort_all_documents
from app.services.llm_service import get_document_summary_llm
from app.services.llm_service import get_agent_llm
async with get_celery_session_maker()() as session:
llm = await get_document_summary_llm(
session, search_space_id, disable_streaming=True
)
llm = await get_agent_llm(session, search_space_id, disable_streaming=True)
if llm is None:
logger.warning(
"No LLM configured for search_space=%d, skipping AI sort",
@ -1604,7 +1589,7 @@ def ai_sort_document_task(self, search_space_id: int, user_id: str, document_id:
async def _ai_sort_document_async(search_space_id: int, user_id: str, document_id: int):
from app.db import Document
from app.services.ai_file_sort_service import ai_sort_document
from app.services.llm_service import get_document_summary_llm
from app.services.llm_service import get_agent_llm
async with get_celery_session_maker()() as session:
document = await session.get(Document, document_id)
@ -1612,9 +1597,7 @@ async def _ai_sort_document_async(search_space_id: int, user_id: str, document_i
logger.warning("Document %d not found, skipping AI sort", document_id)
return
llm = await get_document_summary_llm(
session, search_space_id, disable_streaming=True
)
llm = await get_agent_llm(session, search_space_id, disable_streaming=True)
if llm is None:
logger.warning(
"No LLM for search_space=%d, skipping AI sort of doc=%d",

View file

@ -114,8 +114,8 @@ async def build_new_chat_input_state(
final_query = _render_query_with_context(
agent_user_query=agent_user_query,
recent_reports=recent_reports,
mentioned_connectors=mentioned_connectors,
recent_reports=recent_reports,
)
if thread_visibility == ChatVisibility.SEARCH_SPACE and current_user_display_name:
@ -198,44 +198,11 @@ async def _resolve_mentions_for_query(
return agent_user_query, accepted_folder_ids
def _render_connector_block(mentioned_connectors: list[dict[str, Any]]) -> str | None:
"""Render the ``<mentioned_connectors>`` block, or ``None`` when empty.
Malformed entries (non-dict, or missing id/type) are skipped.
"""
connector_lines: list[str] = []
for connector in mentioned_connectors:
if not isinstance(connector, dict):
continue
connector_id = connector.get("id")
connector_type = connector.get("connector_type") or connector.get(
"document_type"
)
account_name = connector.get("account_name") or connector.get("title")
if connector_id is None or connector_type is None:
continue
connector_lines.append(
f' - connector_id={connector_id}, connector_type="{connector_type}", '
f'account_name="{account_name or ""}"'
)
if not connector_lines:
return None
return (
"<mentioned_connectors>\n"
"The user selected these exact connector accounts with @. "
"These entries are selection metadata, not retrieved connector content. "
"When a connector-backed tool needs an account, use the matching "
"connector_id from this list if the tool supports connector_id:\n"
+ "\n".join(connector_lines)
+ "\n</mentioned_connectors>"
)
def _render_query_with_context(
*,
agent_user_query: str,
recent_reports: list[Report],
mentioned_connectors: list[dict[str, Any]] | None,
recent_reports: list[Report],
) -> str:
"""Prepend the ``<mentioned_connectors>`` then ``<report_context>`` blocks.
@ -243,10 +210,9 @@ def _render_query_with_context(
"""
context_parts: list[str] = []
if mentioned_connectors:
connector_block = _render_connector_block(mentioned_connectors)
if connector_block:
context_parts.append(connector_block)
connector_context = _render_mentioned_connectors(mentioned_connectors)
if connector_context:
context_parts.append(connector_context)
if recent_reports:
report_lines: list[str] = []
@ -272,3 +238,40 @@ def _render_query_with_context(
return f"{context}\n\n<user_query>{agent_user_query}</user_query>"
return agent_user_query
def _render_mentioned_connectors(
mentioned_connectors: list[dict[str, Any]] | None,
) -> str | None:
"""Render selected connector account metadata for connector-backed tools."""
if not mentioned_connectors:
return None
connector_lines: list[str] = []
for connector in mentioned_connectors:
if not isinstance(connector, dict):
continue
connector_id = connector.get("id")
connector_type = connector.get("connector_type") or connector.get(
"document_type"
)
account_name = connector.get("account_name") or connector.get("title")
if connector_id is None or connector_type is None:
continue
connector_lines.append(
f' - connector_id={connector_id}, connector_type="{connector_type}", '
f'account_name="{account_name or ""}"'
)
if not connector_lines:
return None
return (
"<mentioned_connectors>\n"
"The user selected these exact connector accounts with @. "
"These entries are selection metadata, not retrieved connector content. "
"When a connector-backed tool needs an account, use the matching "
"connector_id from this list if the tool supports connector_id:\n"
+ "\n".join(connector_lines)
+ "\n</mentioned_connectors>"
)

View file

@ -14,13 +14,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.airtable_history import AirtableHistoryConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -394,29 +392,10 @@ async def index_airtable_records(
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Heavy processing (embeddings, chunks)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"record_id": item["record_id"],
"created_time": item["record"].get("CREATED_TIME()", ""),
"document_type": "Airtable Record",
"connector_type": "Airtable",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["markdown_content"],
user_llm,
document_metadata_for_summary,
)
else:
summary_content = f"Airtable Record: {item['record_id']}\n\n{item['markdown_content']}"
summary_embedding = embed_text(summary_content)
summary_content = f"Airtable Record: {item['record_id']}\n\n{item['markdown_content']}"
summary_embedding = embed_text(summary_content)
chunks = await create_document_chunks(item["markdown_content"])

View file

@ -15,13 +15,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.bookstack_connector import BookStackConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -384,10 +382,7 @@ async def index_bookstack_pages(
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Heavy processing (embeddings, chunks)
# Build document metadata
doc_metadata = {
@ -403,23 +398,8 @@ async def index_bookstack_pages(
"connector_id": connector_id,
}
if user_llm and connector.enable_summary:
summary_metadata = {
"page_name": item["page_name"],
"page_id": item["page_id"],
"book_id": item["book_id"],
"document_type": "BookStack Page",
"connector_type": "BookStack",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["full_content"], user_llm, summary_metadata
)
else:
summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n{item['full_content']}"
summary_embedding = embed_text(summary_content)
summary_content = f"BookStack Page: {item['page_name']}\n\nBook ID: {item['book_id']}\n\n{item['full_content']}"
summary_embedding = embed_text(summary_content)
# Process chunks - using the full page content
chunks = await create_document_chunks(item["full_content"])

View file

@ -16,13 +16,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.clickup_history import ClickUpHistoryConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -393,32 +391,10 @@ async def index_clickup_tasks(
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Heavy processing (embeddings, chunks)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"task_id": item["task_id"],
"task_name": item["task_name"],
"task_status": item["task_status"],
"task_priority": item["task_priority"],
"task_list": item["task_list_name"],
"task_space": item["task_space_name"],
"assignees": len(item["task_assignees"]),
"document_type": "ClickUp Task",
"connector_type": "ClickUp",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["task_content"], user_llm, document_metadata_for_summary
)
else:
summary_content = item["task_content"]
summary_embedding = embed_text(item["task_content"])
summary_content = item["task_content"]
summary_embedding = embed_text(item["task_content"])
chunks = await create_document_chunks(item["task_content"])

View file

@ -14,7 +14,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from .base import (
@ -36,7 +35,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Confluence page dict to a ConnectorDocument."""
page_id = page.get("id", "")
@ -54,10 +52,6 @@ def _build_connector_doc(
"connector_type": "Confluence",
}
fallback_summary = (
f"Confluence Page: {page_title}\n\nSpace ID: {space_id}\n\n{full_content}"
)
return ConnectorDocument(
title=page_title,
source_markdown=full_content,
@ -66,8 +60,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -268,8 +260,7 @@ async def index_confluence_pages(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
)
with session.no_autoflush:
duplicate_by_content = await check_duplicate_document_by_hash(
@ -297,12 +288,8 @@ async def index_confluence_pages(
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s: AsyncSession):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,

View file

@ -27,7 +27,6 @@ from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
@ -126,7 +125,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
@ -138,8 +136,6 @@ def _build_connector_doc(
"connector_type": "Dropbox",
}
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
return ConnectorDocument(
title=file_name,
source_markdown=markdown,
@ -148,8 +144,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -161,7 +155,6 @@ async def _download_files_parallel(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
@ -191,7 +184,6 @@ async def _download_files_parallel(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
async with hb_lock:
completed_count += 1
@ -223,7 +215,6 @@ async def _download_and_index(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
@ -234,7 +225,6 @@ async def _download_and_index(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -243,13 +233,8 @@ async def _download_and_index(
batch_failed = 0
if connector_docs:
pipeline = IndexingPipelineService(session)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
@ -289,7 +274,6 @@ async def _index_with_delta_sync(
log_entry: object,
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int, str]:
"""Delta sync using Dropbox cursor-based change tracking.
@ -361,7 +345,6 @@ async def _index_with_delta_sync(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -388,7 +371,6 @@ async def _index_full_scan(
include_subfolders: bool = True,
incremental_sync: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -473,7 +455,6 @@ async def _index_full_scan(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -502,7 +483,6 @@ async def _index_selected_files(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
incremental_sync: bool = True,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
@ -563,7 +543,6 @@ async def _index_selected_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -629,7 +608,6 @@ async def index_dropbox_files(
)
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
@ -664,7 +642,6 @@ async def index_dropbox_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
incremental_sync=incremental_sync,
vision_llm=vision_llm,
)
@ -700,7 +677,6 @@ async def index_dropbox_files(
task_logger,
log_entry,
max_files,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
folder_cursors[folder_path] = new_cursor
@ -720,7 +696,6 @@ async def index_dropbox_files(
max_files,
include_subfolders,
incremental_sync=incremental_sync,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_unsupported += unsup

View file

@ -18,13 +18,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.github_connector import GitHubConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -351,42 +349,14 @@ async def index_github_repos(
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
# Heavy processing (embeddings, chunks)
summary_text = (
f"# GitHub Repository: {repo_full_name}\n\n"
f"## Summary\n{digest.summary}\n\n"
f"## File Structure\n{digest.tree}"
)
document_metadata_for_summary = {
"repository": repo_full_name,
"document_type": "GitHub Repository",
"connector_type": "GitHub",
"ingestion_method": "gitingest",
"file_tree": digest.tree[:2000]
if len(digest.tree) > 2000
else digest.tree,
"estimated_tokens": digest.estimated_tokens,
}
if user_llm and connector.enable_summary:
# Prepare content for summarization
summary_content = digest.full_digest
if len(summary_content) > MAX_DIGEST_CHARS:
summary_content = (
f"# Repository: {repo_full_name}\n\n"
f"## File Structure\n\n{digest.tree}\n\n"
f"## File Contents (truncated)\n\n{digest.content[: MAX_DIGEST_CHARS - len(digest.tree) - 200]}..."
)
summary_text, summary_embedding = await generate_document_summary(
summary_content, user_llm, document_metadata_for_summary
)
else:
summary_text = (
f"# GitHub Repository: {repo_full_name}\n\n"
f"## Summary\n{digest.summary}\n\n"
f"## File Structure\n{digest.tree}"
)
summary_embedding = embed_text(summary_text)
summary_embedding = embed_text(summary_text)
# Chunk the full digest content for granular search
try:

View file

@ -2,7 +2,7 @@
Google Calendar connector indexer.
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding.
chunking, and embedding.
"""
from collections.abc import Awaitable, Callable
@ -21,7 +21,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
@ -53,7 +52,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Google Calendar API event dict to a ConnectorDocument."""
event_id = event.get("id", "")
@ -78,8 +76,6 @@ def _build_connector_doc(
"connector_type": "Google Calendar",
}
fallback_summary = f"Google Calendar Event: {event_summary}\n\n{event_markdown}"
return ConnectorDocument(
title=event_summary,
source_markdown=event_markdown,
@ -88,8 +84,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -420,8 +414,7 @@ async def index_google_calendar_events(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
)
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
@ -448,13 +441,8 @@ async def index_google_calendar_events(
# ── Pipeline: migrate legacy docs + parallel index ─────────────
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,

View file

@ -40,7 +40,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
@ -381,7 +380,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Build a ConnectorDocument from Drive file metadata + extracted markdown."""
file_id = file.get("id", "")
@ -394,8 +392,6 @@ def _build_connector_doc(
"connector_type": "Google Drive",
}
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
return ConnectorDocument(
title=file_name,
source_markdown=markdown,
@ -404,8 +400,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -461,7 +455,6 @@ async def _download_files_parallel(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
@ -494,7 +487,6 @@ async def _download_files_parallel(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
async with hb_lock:
completed_count += 1
@ -525,7 +517,6 @@ async def _process_single_file(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Download, extract, and index a single Drive file via the pipeline.
@ -561,8 +552,7 @@ async def _process_single_file(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
)
pipeline = IndexingPipelineService(session)
documents = await pipeline.prepare_for_indexing([doc])
@ -578,10 +568,7 @@ async def _process_single_file(
connector_doc = doc_map.get(document.unique_identifier_hash)
if not connector_doc:
continue
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
await pipeline.index(document, connector_doc, user_llm)
await pipeline.index(document, connector_doc)
await page_limit_service.update_page_usage(
user_id, estimated_pages, allow_exceed=True
@ -636,7 +623,6 @@ async def _download_and_index(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
@ -650,7 +636,6 @@ async def _download_and_index(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -659,13 +644,8 @@ async def _download_and_index(
batch_failed = 0
if connector_docs:
pipeline = IndexingPipelineService(session)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
@ -681,7 +661,6 @@ async def _index_selected_files(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int, int, list[str]]:
@ -746,7 +725,6 @@ async def _index_selected_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -781,7 +759,6 @@ async def _index_full_scan(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -911,7 +888,6 @@ async def _index_full_scan(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -946,7 +922,6 @@ async def _index_with_delta_sync(
max_files: int,
include_subfolders: bool = False,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Delta sync using change tracking.
@ -1054,7 +1029,6 @@ async def _index_with_delta_sync(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -1142,7 +1116,6 @@ async def index_google_drive_files(
)
return 0, 0, client_error, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
@ -1189,7 +1162,6 @@ async def index_google_drive_files(
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
documents_unsupported += du
@ -1208,7 +1180,6 @@ async def index_google_drive_files(
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
documents_indexed += ri
@ -1234,7 +1205,6 @@ async def index_google_drive_files(
max_files,
include_subfolders,
on_heartbeat_callback,
connector_enable_summary,
vision_llm=vision_llm,
)
@ -1346,7 +1316,6 @@ async def index_google_drive_single_file(
)
return 0, client_error
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
@ -1370,7 +1339,6 @@ async def index_google_drive_single_file(
connector_id,
search_space_id,
user_id,
connector_enable_summary,
vision_llm=vision_llm,
)
await session.commit()
@ -1467,7 +1435,6 @@ async def index_google_drive_selected_files(
)
return 0, 0, [error_msg]
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
@ -1481,7 +1448,6 @@ async def index_google_drive_selected_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)

View file

@ -2,7 +2,7 @@
Google Gmail connector indexer.
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding.
chunking, and embedding.
"""
from collections.abc import Awaitable, Callable
@ -21,7 +21,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
PlaceholderInfo,
)
from app.services.composio_service import ComposioService
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.google_credentials import COMPOSIO_GOOGLE_CONNECTOR_TYPES
@ -105,7 +104,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Gmail API message dict to a ConnectorDocument."""
message_id = message.get("id", "")
@ -138,12 +136,6 @@ def _build_connector_doc(
"connector_type": "Google Gmail",
}
fallback_summary = (
f"Google Gmail Message: {subject}\n\n"
f"From: {sender}\nDate: {date_str}\n\n"
f"{markdown_content}"
)
return ConnectorDocument(
title=subject,
source_markdown=markdown_content,
@ -152,8 +144,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -454,8 +444,7 @@ async def index_google_gmail_messages(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
)
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
@ -483,13 +472,8 @@ async def index_google_gmail_messages(
# ── Pipeline: migrate legacy docs + parallel index ─────────────
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,

View file

@ -2,7 +2,7 @@
Linear connector indexer.
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding with bounded parallel indexing.
chunking, and embedding with bounded parallel indexing.
"""
from collections.abc import Awaitable, Callable
@ -18,7 +18,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from .base import (
@ -41,7 +40,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Linear issue dict to a ConnectorDocument."""
issue_id = issue.get("id", "")
@ -63,11 +61,6 @@ def _build_connector_doc(
"connector_type": "Linear",
}
fallback_summary = (
f"Linear Issue {issue_identifier}: {issue_title}\n\n"
f"Status: {state}\n\n{issue_content}"
)
return ConnectorDocument(
title=f"{issue_identifier}: {issue_title}",
source_markdown=issue_content,
@ -76,8 +69,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -277,8 +268,7 @@ async def index_linear_issues(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
)
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
@ -306,13 +296,8 @@ async def index_linear_issues(
# ── Pipeline: migrate legacy docs + parallel index ────────────
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,

View file

@ -33,7 +33,6 @@ from app.db import (
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitExceededError, PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker
@ -478,7 +477,6 @@ def _build_connector_doc(
*,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Build a ConnectorDocument from a local file's extracted content."""
unique_id = f"{folder_name}:{relative_path}"
@ -488,7 +486,6 @@ def _build_connector_doc(
"document_type": "Local Folder File",
"connector_type": "Local Folder",
}
fallback_summary = f"File: {title}\n\n{content[:4000]}"
return ConnectorDocument(
title=title,
@ -498,8 +495,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=None,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -513,7 +508,6 @@ async def index_local_folder(
exclude_patterns: list[str] | None = None,
file_extensions: list[str] | None = None,
root_folder_id: int | None = None,
enable_summary: bool = False,
target_file_paths: list[str] | None = None,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, int | None, str | None]:
@ -574,8 +568,7 @@ async def index_local_folder(
folder_path=folder_path,
folder_name=folder_name,
target_file_path=target_file_paths[0],
enable_summary=enable_summary,
root_folder_id=root_folder_id,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
)
@ -587,8 +580,7 @@ async def index_local_folder(
folder_path=folder_path,
folder_name=folder_name,
target_file_paths=target_file_paths,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
root_folder_id=root_folder_id,
on_progress_callback=on_heartbeat_callback,
)
if err:
@ -774,8 +766,7 @@ async def index_local_folder(
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
)
connector_docs.append(doc)
file_meta_map[unique_identifier] = {
"relative_path": relative_path,
@ -845,15 +836,13 @@ async def index_local_folder(
doc_map = {compute_unique_identifier_hash(cd): cd for cd in connector_docs}
documents = await pipeline.prepare_for_indexing(connector_docs)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
for document in documents:
connector_doc = doc_map.get(document.unique_identifier_hash)
if connector_doc is None:
failed_count += 1
continue
result = await pipeline.index(document, connector_doc, llm)
result = await pipeline.index(document, connector_doc)
if DocumentStatus.is_state(result.status, DocumentStatus.READY):
indexed_count += 1
@ -960,7 +949,6 @@ async def _index_batch_files(
folder_path: str,
folder_name: str,
target_file_paths: list[str],
enable_summary: bool,
root_folder_id: int | None,
on_progress_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, int, str | None]:
@ -995,8 +983,7 @@ async def _index_batch_files(
folder_path=folder_path,
folder_name=folder_name,
target_file_path=file_path,
enable_summary=enable_summary,
root_folder_id=root_folder_id,
root_folder_id=root_folder_id,
task_logger=task_logger,
log_entry=log_entry,
)
@ -1036,7 +1023,6 @@ async def _index_single_file(
folder_path: str,
folder_name: str,
target_file_path: str,
enable_summary: bool,
root_folder_id: int | None,
task_logger,
log_entry,
@ -1125,8 +1111,7 @@ async def _index_single_file(
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
)
if root_folder_id:
connector_doc.folder_id = await _resolve_folder_for_file(
@ -1134,7 +1119,6 @@ async def _index_single_file(
)
pipeline = IndexingPipelineService(session)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
documents = await pipeline.prepare_for_indexing([connector_doc])
if not documents:
@ -1142,7 +1126,7 @@ async def _index_single_file(
db_doc = documents[0]
await pipeline.index(db_doc, connector_doc, llm)
await pipeline.index(db_doc, connector_doc)
await session.refresh(db_doc)
doc_meta = dict(db_doc.document_metadata or {})
@ -1275,7 +1259,6 @@ async def index_uploaded_files(
user_id: str,
folder_name: str,
root_folder_id: int,
enable_summary: bool,
file_mappings: list[dict],
on_heartbeat_callback: HeartbeatCallbackType | None = None,
use_vision_llm: bool = False,
@ -1318,7 +1301,6 @@ async def index_uploaded_files(
page_limit_service = PageLimitService(session)
pipeline = IndexingPipelineService(session)
llm = await get_user_long_context_llm(session, user_id, search_space_id)
vision_llm_instance = None
if use_vision_llm:
@ -1414,8 +1396,7 @@ async def index_uploaded_files(
folder_name=folder_name,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
)
connector_doc.folder_id = await _resolve_folder_for_file(
session,
@ -1432,7 +1413,7 @@ async def index_uploaded_files(
db_doc = documents[0]
await pipeline.index(db_doc, connector_doc, llm)
await pipeline.index(db_doc, connector_doc)
await session.refresh(db_doc)
doc_meta = dict(db_doc.document_metadata or {})

View file

@ -16,13 +16,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.luma_connector import LumaConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -437,38 +435,14 @@ async def index_luma_events(
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (LLM, embeddings, chunks)
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Heavy processing (embeddings, chunks)
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"event_id": item["event_id"],
"event_name": item["event_name"],
"event_url": item["event_url"],
"start_at": item["start_at"],
"end_at": item["end_at"],
"timezone": item["timezone"],
"location": item["location"] or "No location",
"city": item["city"],
"hosts": item["host_names"],
"document_type": "Luma Event",
"connector_type": "Luma",
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
item["event_markdown"], user_llm, document_metadata_for_summary
)
else:
summary_content = (
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
)
summary_embedding = await asyncio.to_thread(
embed_text, summary_content
)
summary_content = (
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
)
summary_embedding = await asyncio.to_thread(
embed_text, summary_content
)
chunks = await create_document_chunks(item["event_markdown"])

View file

@ -2,7 +2,7 @@
Notion connector indexer.
Uses the shared IndexingPipelineService for document deduplication,
summarization, chunking, and embedding with bounded parallel indexing.
chunking, and embedding with bounded parallel indexing.
"""
from collections.abc import Awaitable, Callable
@ -19,7 +19,6 @@ from app.indexing_pipeline.indexing_pipeline_service import (
IndexingPipelineService,
PlaceholderInfo,
)
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.notion_utils import process_blocks
@ -43,7 +42,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
"""Map a raw Notion page dict to a ConnectorDocument."""
page_id = page.get("page_id", "")
@ -57,8 +55,6 @@ def _build_connector_doc(
"connector_type": "Notion",
}
fallback_summary = f"Notion Page: {page_title}\n\n{markdown_content}"
return ConnectorDocument(
title=page_title,
source_markdown=markdown_content,
@ -67,8 +63,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -314,8 +308,7 @@ async def index_notion_pages(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector.enable_summary,
)
)
with session.no_autoflush:
duplicate = await check_duplicate_document_by_hash(
@ -343,13 +336,8 @@ async def index_notion_pages(
# ── Pipeline: migrate legacy docs + parallel index ────────────
await pipeline.migrate_legacy_docs(connector_docs)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, documents_indexed, documents_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat_callback,
heartbeat_interval=HEARTBEAT_INTERVAL_SECONDS,

View file

@ -27,7 +27,6 @@ from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnector
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_hashing import compute_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
from app.services.llm_service import get_user_long_context_llm
from app.services.page_limit_service import PageLimitService
from app.services.task_logging_service import TaskLoggingService
from app.tasks.connector_indexers.base import (
@ -133,7 +132,6 @@ def _build_connector_doc(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
) -> ConnectorDocument:
file_id = file.get("id", "")
file_name = file.get("name", "Unknown")
@ -145,8 +143,6 @@ def _build_connector_doc(
"connector_type": "OneDrive",
}
fallback_summary = f"File: {file_name}\n\n{markdown[:4000]}"
return ConnectorDocument(
title=file_name,
source_markdown=markdown,
@ -155,8 +151,6 @@ def _build_connector_doc(
search_space_id=search_space_id,
connector_id=connector_id,
created_by_id=user_id,
should_summarize=enable_summary,
fallback_summary=fallback_summary,
metadata=metadata,
)
@ -168,7 +162,6 @@ async def _download_files_parallel(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
max_concurrency: int = 3,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
@ -198,7 +191,6 @@ async def _download_files_parallel(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
)
async with hb_lock:
completed_count += 1
@ -230,7 +222,6 @@ async def _download_and_index(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int]:
@ -241,7 +232,6 @@ async def _download_and_index(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -250,13 +240,8 @@ async def _download_and_index(
batch_failed = 0
if connector_docs:
pipeline = IndexingPipelineService(session)
async def _get_llm(s):
return await get_user_long_context_llm(s, user_id, search_space_id)
_, batch_indexed, batch_failed = await pipeline.index_batch_parallel(
connector_docs,
_get_llm,
max_concurrency=3,
on_heartbeat=on_heartbeat,
)
@ -294,7 +279,6 @@ async def _index_selected_files(
connector_id: int,
search_space_id: int,
user_id: str,
enable_summary: bool,
on_heartbeat: HeartbeatCallbackType | None = None,
vision_llm=None,
) -> tuple[int, int, int, list[str]]:
@ -345,7 +329,6 @@ async def _index_selected_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat,
vision_llm=vision_llm,
)
@ -379,7 +362,6 @@ async def _index_full_scan(
max_files: int,
include_subfolders: bool = True,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int]:
"""Full scan indexing of a folder.
@ -454,7 +436,6 @@ async def _index_full_scan(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -487,7 +468,6 @@ async def _index_with_delta_sync(
log_entry: object,
max_files: int,
on_heartbeat_callback: HeartbeatCallbackType | None = None,
enable_summary: bool = True,
vision_llm=None,
) -> tuple[int, int, int, str | None]:
"""Delta sync using OneDrive change tracking.
@ -579,7 +559,6 @@ async def _index_with_delta_sync(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=enable_summary,
on_heartbeat=on_heartbeat_callback,
vision_llm=vision_llm,
)
@ -651,7 +630,6 @@ async def index_onedrive_files(
)
return 0, 0, error_msg, 0
connector_enable_summary = getattr(connector, "enable_summary", True)
connector_enable_vision_llm = getattr(connector, "enable_vision_llm", False)
vision_llm = None
if connector_enable_vision_llm:
@ -681,7 +659,6 @@ async def index_onedrive_files(
connector_id=connector_id,
search_space_id=search_space_id,
user_id=user_id,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed
@ -711,7 +688,6 @@ async def index_onedrive_files(
task_logger,
log_entry,
max_files,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed
@ -738,7 +714,6 @@ async def index_onedrive_files(
log_entry,
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += ri
@ -758,7 +733,6 @@ async def index_onedrive_files(
log_entry,
max_files,
include_subfolders,
enable_summary=connector_enable_summary,
vision_llm=vision_llm,
)
total_indexed += indexed

View file

@ -15,13 +15,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.connectors.webcrawler_connector import WebCrawlerConnector
from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.webcrawler_utils import parse_webcrawler_urls
@ -372,29 +370,10 @@ async def index_crawled_urls(
documents_skipped += 1
continue
# Generate summary with LLM
user_llm = await get_user_long_context_llm(
session, user_id, search_space_id
)
# Select deterministic document content
if user_llm and connector.enable_summary:
document_metadata_for_summary = {
"url": url,
"title": title,
"description": description,
"language": language,
"document_type": "Crawled URL",
"crawler_type": crawler_type,
}
(
summary_content,
summary_embedding,
) = await generate_document_summary(
structured_document, user_llm, document_metadata_for_summary
)
else:
summary_content = f"Crawled URL: {title}\n\nURL: {url}\n\n{content}"
summary_embedding = embed_text(summary_content)
summary_content = f"Crawled URL: {title}\n\nURL: {url}\n\n{content}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(content)

View file

@ -1,20 +1,15 @@
"""
Unified document save/update logic for file processors.
"""
"""Unified document save/update logic for file processors."""
import asyncio
import logging
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
)
from ._helpers import (
@ -24,59 +19,6 @@ from ._helpers import (
)
from .base import get_current_timestamp, safe_set_chunks
# ---------------------------------------------------------------------------
# Summary generation
# ---------------------------------------------------------------------------
async def _generate_summary(
markdown_content: str,
file_name: str,
etl_service: str,
user_llm,
enable_summary: bool,
) -> tuple[str, list[float]]:
"""
Generate a document summary and embedding.
Docling uses its own large-document summary strategy; other ETL services
use the standard ``generate_document_summary`` helper.
"""
if not enable_summary:
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
return summary, await asyncio.to_thread(embed_text, summary)
if etl_service == "DOCLING":
from app.services.docling_service import create_docling_service
docling_service = create_docling_service()
summary_text = await docling_service.process_large_document_summary(
content=markdown_content, llm=user_llm, document_title=file_name
)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
parts = ["# DOCUMENT METADATA"]
for key, value in meta.items():
if value:
formatted_key = key.replace("_", " ").title()
parts.append(f"**{formatted_key}:** {value}")
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
return enhanced, await asyncio.to_thread(embed_text, enhanced)
# Standard summary (Unstructured / LlamaCloud / others)
meta = {
"file_name": file_name,
"etl_service": etl_service,
"document_type": "File Document",
}
return await generate_document_summary(markdown_content, user_llm, meta)
# ---------------------------------------------------------------------------
# Unified save function
# ---------------------------------------------------------------------------
@ -90,7 +32,6 @@ async def save_file_document(
user_id: str,
etl_service: str,
connector: dict | None = None,
enable_summary: bool = True,
) -> Document | None:
"""
Process and store a file document with deduplication and migration support.
@ -106,7 +47,6 @@ async def save_file_document(
user_id: ID of the user
etl_service: Name of the ETL service (UNSTRUCTURED, LLAMACLOUD, DOCLING)
connector: Optional connector info for Google Drive files
enable_summary: Whether to generate an AI summary
Returns:
Document object if successful, None if duplicate detected
@ -133,24 +73,16 @@ async def save_file_document(
if should_skip:
return doc
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} "
f"in search space {search_space_id}"
)
summary_content, summary_embedding = await _generate_summary(
markdown_content, file_name, etl_service, user_llm, enable_summary
)
document_content = f"File: {file_name}\n\n{markdown_content[:4000]}"
document_embedding = embed_text(document_content)
chunks = await create_document_chunks(markdown_content)
doc_metadata = {"FILE_NAME": file_name, "ETL_SERVICE": etl_service}
if existing_document:
existing_document.title = file_name
existing_document.content = summary_content
existing_document.content = document_content
existing_document.content_hash = content_hash
existing_document.embedding = summary_embedding
existing_document.embedding = document_embedding
existing_document.document_metadata = doc_metadata
await safe_set_chunks(session, existing_document, chunks)
existing_document.source_markdown = markdown_content
@ -171,8 +103,8 @@ async def save_file_document(
title=file_name,
document_type=doc_type,
document_metadata=doc_metadata,
content=summary_content,
embedding=summary_embedding,
content=document_content,
embedding=document_embedding,
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=primary_hash,

View file

@ -25,11 +25,10 @@ from app.db import (
SearchSourceConnectorType,
SearchSpace,
)
from app.services.llm_service import get_document_summary_llm
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -176,34 +175,8 @@ async def add_circleback_meeting_document(
# PHASE 3: Process the document content
# =======================================================================
# Get LLM for generating summary
llm = await get_document_summary_llm(session, search_space_id)
if not llm:
logger.warning(
f"No LLM configured for search space {search_space_id}. Using content as summary."
)
# Use first 1000 chars as summary if no LLM available
summary_content = (
markdown_content[:1000] + "..."
if len(markdown_content) > 1000
else markdown_content
)
summary_embedding = None
else:
# Generate summary with metadata
summary_metadata = {
"meeting_name": meeting_name,
"meeting_id": meeting_id,
"document_type": "Circleback Meeting",
**{
k: v
for k, v in metadata.items()
if isinstance(v, str | int | float | bool)
},
}
summary_content, summary_embedding = await generate_document_summary(
markdown_content, llm, summary_metadata
)
summary_content = markdown_content
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(markdown_content)
@ -224,8 +197,7 @@ async def add_circleback_meeting_document(
document.title = meeting_name
document.content = summary_content
document.content_hash = content_hash
if summary_embedding is not None:
document.embedding = summary_embedding
document.embedding = summary_embedding
document.document_metadata = document_metadata
await safe_set_chunks(session, document, chunks)
document.source_markdown = markdown_content

View file

@ -9,12 +9,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentType
from app.schemas import ExtensionDocumentContent
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
@ -123,26 +122,8 @@ async def add_extension_received_document(
f"Content changed for URL {content.metadata.VisitedWebPageURL}. Updating document."
)
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary with metadata
document_metadata = {
"session_id": content.metadata.BrowsingSessionId,
"url": content.metadata.VisitedWebPageURL,
"title": content.metadata.VisitedWebPageTitle,
"referrer": content.metadata.VisitedWebPageReffererURL,
"timestamp": content.metadata.VisitedWebPageDateWithTimeInISOString,
"duration_ms": content.metadata.VisitedWebPageVisitDurationInMilliseconds,
"document_type": "Browser Extension Capture",
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata
)
summary_content = combined_document_string
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(content.pageContent)

View file

@ -10,7 +10,7 @@ from __future__ import annotations
import contextlib
import logging
import os
from dataclasses import dataclass, field
from dataclasses import dataclass
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
@ -49,12 +49,6 @@ class _ProcessingContext:
notification: Notification | None = None
use_vision_llm: bool = False
processing_mode: str = "basic"
enable_summary: bool = field(init=False)
def __post_init__(self) -> None:
self.enable_summary = (
self.connector.get("enable_summary", True) if self.connector else True
)
# ---------------------------------------------------------------------------
@ -262,7 +256,6 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
ctx.user_id,
etl_result.etl_service,
ctx.connector,
enable_summary=ctx.enable_summary,
)
if result:
@ -467,7 +460,6 @@ async def process_file_in_background_with_document(
log_entry: Log,
connector: dict | None = None,
notification: Notification | None = None,
should_summarize: bool = False,
use_vision_llm: bool = False,
processing_mode: str = "basic",
) -> Document | None:
@ -483,7 +475,6 @@ async def process_file_in_background_with_document(
from app.indexing_pipeline.adapters.file_upload_adapter import (
UploadDocumentAdapter,
)
from app.services.llm_service import get_user_long_context_llm
from app.utils.document_converters import generate_content_hash
from .base import check_duplicate_document
@ -523,8 +514,6 @@ async def process_file_in_background_with_document(
stage="chunking",
)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
adapter = UploadDocumentAdapter(session)
await adapter.index(
markdown_content=markdown_content,
@ -532,8 +521,6 @@ async def process_file_in_background_with_document(
etl_service=etl_service,
search_space_id=search_space_id,
user_id=user_id,
llm=user_llm,
should_summarize=should_summarize,
)
if billable_pages > 0:

View file

@ -8,12 +8,11 @@ from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
)
from ._helpers import (
@ -183,21 +182,8 @@ async def add_received_markdown_file_document(
return doc
# Content changed - continue to update
# Get user's long context LLM (needed for both create and update)
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary with metadata
document_metadata = {
"file_name": file_name,
"document_type": "Markdown File Document",
}
summary_content, summary_embedding = await generate_document_summary(
file_in_markdown, user_llm, document_metadata
)
summary_content = f"File: {file_name}\n\n{file_in_markdown[:4000]}"
summary_embedding = embed_text(summary_content)
# Process chunks
chunks = await create_document_chunks(file_in_markdown)

View file

@ -17,12 +17,11 @@ from sqlalchemy.ext.asyncio import AsyncSession
from youtube_transcript_api import YouTubeTranscriptApi
from app.db import Document, DocumentStatus, DocumentType
from app.services.llm_service import get_user_long_context_llm
from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import (
create_document_chunks,
embed_text,
generate_content_hash,
generate_document_summary,
generate_unique_identifier_hash,
)
from app.utils.proxy_config import get_requests_proxies
@ -355,40 +354,8 @@ async def add_youtube_video_document(
await session.commit()
return document
# Get LLM for summary generation
await task_logger.log_task_progress(
log_entry,
f"Preparing for summary generation: {video_data.get('title', 'YouTube Video')}",
{"stage": "llm_setup"},
)
# Get user's long context LLM
user_llm = await get_user_long_context_llm(session, user_id, search_space_id)
if not user_llm:
raise RuntimeError(
f"No long context LLM configured for user {user_id} in search space {search_space_id}"
)
# Generate summary
await task_logger.log_task_progress(
log_entry,
f"Generating summary for video: {video_data.get('title', 'YouTube Video')}",
{"stage": "summary_generation"},
)
# Generate summary with metadata
document_metadata_for_summary = {
"url": url,
"video_id": video_id,
"title": video_data.get("title", "YouTube Video"),
"author": video_data.get("author_name", "Unknown"),
"thumbnail": video_data.get("thumbnail_url", ""),
"document_type": "YouTube Video Document",
"has_transcript": "No captions available" not in transcript_text,
}
summary_content, summary_embedding = await generate_document_summary(
combined_document_string, user_llm, document_metadata_for_summary
)
summary_content = combined_document_string
summary_embedding = embed_text(summary_content)
# Process chunks
await task_logger.log_task_progress(