From 1791241c0c8c451b7c82ec6e688b4264498da3fb Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 20 May 2026 10:09:38 +0200 Subject: [PATCH] perf(indexers): offload sync embed_text to thread across background workers Connector kb_sync_services (gmail, onedrive, google_calendar, jira), streaming indexers (discord, luma, teams) and the file-processor save path all called embed_text inside async coroutines, blocking the background worker's event loop for the duration of the embed. Wrap each call site in asyncio.to_thread so concurrent indexing tasks stop serialising on the embed. --- surfsense_backend/app/services/gmail/kb_sync_service.py | 5 ++++- .../app/services/google_calendar/kb_sync_service.py | 8 ++++++-- surfsense_backend/app/services/jira/kb_sync_service.py | 8 ++++++-- .../app/services/onedrive/kb_sync_service.py | 5 ++++- .../app/tasks/connector_indexers/discord_indexer.py | 4 +++- .../app/tasks/connector_indexers/luma_indexer.py | 5 ++++- .../app/tasks/connector_indexers/teams_indexer.py | 5 ++++- surfsense_backend/app/tasks/document_processors/_save.py | 5 +++-- 8 files changed, 34 insertions(+), 11 deletions(-) diff --git a/surfsense_backend/app/services/gmail/kb_sync_service.py b/surfsense_backend/app/services/gmail/kb_sync_service.py index 885ee4b94..6ff5f3c2b 100644 --- a/surfsense_backend/app/services/gmail/kb_sync_service.py +++ b/surfsense_backend/app/services/gmail/kb_sync_service.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime @@ -100,7 +101,9 @@ class GmailKBSyncService: else: logger.warning("No LLM configured -- using fallback summary") summary_content = f"Gmail Message: {subject}\n\n{indexable_content}" - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(indexable_content) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/surfsense_backend/app/services/google_calendar/kb_sync_service.py b/surfsense_backend/app/services/google_calendar/kb_sync_service.py index 602a55738..1f017ec4d 100644 --- a/surfsense_backend/app/services/google_calendar/kb_sync_service.py +++ b/surfsense_backend/app/services/google_calendar/kb_sync_service.py @@ -116,7 +116,9 @@ class GoogleCalendarKBSyncService: summary_content = ( f"Google Calendar Event: {event_summary}\n\n{indexable_content}" ) - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(indexable_content) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -295,7 +297,9 @@ class GoogleCalendarKBSyncService: summary_content = ( f"Google Calendar Event: {event_summary}\n\n{indexable_content}" ) - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(indexable_content) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/surfsense_backend/app/services/jira/kb_sync_service.py b/surfsense_backend/app/services/jira/kb_sync_service.py index 8e88bee81..5f6668377 100644 --- a/surfsense_backend/app/services/jira/kb_sync_service.py +++ b/surfsense_backend/app/services/jira/kb_sync_service.py @@ -98,7 +98,9 @@ class JiraKBSyncService: summary_content = ( f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}" ) - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(issue_content) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") @@ -212,7 +214,9 @@ class JiraKBSyncService: summary_content = ( f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}" ) - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(issue_content) diff --git a/surfsense_backend/app/services/onedrive/kb_sync_service.py b/surfsense_backend/app/services/onedrive/kb_sync_service.py index e9b2e38ea..e1da3b4a1 100644 --- a/surfsense_backend/app/services/onedrive/kb_sync_service.py +++ b/surfsense_backend/app/services/onedrive/kb_sync_service.py @@ -1,3 +1,4 @@ +import asyncio import logging from datetime import datetime @@ -95,7 +96,9 @@ class OneDriveKBSyncService: else: logger.warning("No LLM configured — using fallback summary") summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}" - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(indexable_content) now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S") diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 5e784cb4f..180f21412 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -670,7 +670,9 @@ async def index_discord_messages( # Heavy processing (embeddings, chunks) chunks = await create_document_chunks(item["combined_document_string"]) - doc_embedding = embed_text(item["combined_document_string"]) + doc_embedding = await asyncio.to_thread( + embed_text, item["combined_document_string"] + ) # Update document to READY with actual content document.title = f"{item['guild_name']}#{item['channel_name']}" diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index a698bfd46..555d60273 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback: - Phase 2: Process each event: pending → processing → ready/failed """ +import asyncio import time from collections.abc import Awaitable, Callable from datetime import datetime, timedelta @@ -465,7 +466,9 @@ async def index_luma_events( summary_content = ( f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}" ) - summary_embedding = embed_text(summary_content) + summary_embedding = await asyncio.to_thread( + embed_text, summary_content + ) chunks = await create_document_chunks(item["event_markdown"]) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 12cdf384e..25994895a 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -9,6 +9,7 @@ Uses 2-phase document status updates for real-time UI feedback: - Phase 2: Process each document: pending → processing → ready/failed """ +import asyncio import time from collections.abc import Awaitable, Callable from datetime import UTC, datetime @@ -581,7 +582,9 @@ async def index_teams_messages( # Heavy processing (embeddings, chunks) chunks = await create_document_chunks(item["combined_document_string"]) - doc_embedding = embed_text(item["combined_document_string"]) + doc_embedding = await asyncio.to_thread( + embed_text, item["combined_document_string"] + ) # Update document to READY with actual content document.title = f"{item['team_name']} - {item['channel_name']}" diff --git a/surfsense_backend/app/tasks/document_processors/_save.py b/surfsense_backend/app/tasks/document_processors/_save.py index ae45f7a69..d633dd4f6 100644 --- a/surfsense_backend/app/tasks/document_processors/_save.py +++ b/surfsense_backend/app/tasks/document_processors/_save.py @@ -2,6 +2,7 @@ Unified document save/update logic for file processors. """ +import asyncio import logging from sqlalchemy.exc import SQLAlchemyError @@ -43,7 +44,7 @@ async def _generate_summary( """ if not enable_summary: summary = f"File: {file_name}\n\n{markdown_content[:4000]}" - return summary, embed_text(summary) + return summary, await asyncio.to_thread(embed_text, summary) if etl_service == "DOCLING": from app.services.docling_service import create_docling_service @@ -65,7 +66,7 @@ async def _generate_summary( parts.append(f"**{formatted_key}:** {value}") enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text - return enhanced, embed_text(enhanced) + return enhanced, await asyncio.to_thread(embed_text, enhanced) # Standard summary (Unstructured / LlamaCloud / others) meta = {