mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-27 19:25:15 +02:00
perf(indexers): offload sync embed_text to thread across background workers
Connector kb_sync_services (gmail, onedrive, google_calendar, jira), streaming indexers (discord, luma, teams) and the file-processor save path all called embed_text inside async coroutines, blocking the background worker's event loop for the duration of the embed. Wrap each call site in asyncio.to_thread so concurrent indexing tasks stop serialising on the embed.
This commit is contained in:
parent
a8de98895a
commit
1791241c0c
8 changed files with 34 additions and 11 deletions
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
@ -100,7 +101,9 @@ class GmailKBSyncService:
|
||||||
else:
|
else:
|
||||||
logger.warning("No LLM configured -- using fallback summary")
|
logger.warning("No LLM configured -- using fallback summary")
|
||||||
summary_content = f"Gmail Message: {subject}\n\n{indexable_content}"
|
summary_content = f"Gmail Message: {subject}\n\n{indexable_content}"
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -116,7 +116,9 @@ class GoogleCalendarKBSyncService:
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
||||||
)
|
)
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -295,7 +297,9 @@ class GoogleCalendarKBSyncService:
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
f"Google Calendar Event: {event_summary}\n\n{indexable_content}"
|
||||||
)
|
)
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -98,7 +98,9 @@ class JiraKBSyncService:
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
||||||
)
|
)
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(issue_content)
|
chunks = await create_document_chunks(issue_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
@ -212,7 +214,9 @@ class JiraKBSyncService:
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
f"Jira Issue {issue_identifier}: {issue_title}\n\n{issue_content}"
|
||||||
)
|
)
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(issue_content)
|
chunks = await create_document_chunks(issue_content)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,4 @@
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
@ -95,7 +96,9 @@ class OneDriveKBSyncService:
|
||||||
else:
|
else:
|
||||||
logger.warning("No LLM configured — using fallback summary")
|
logger.warning("No LLM configured — using fallback summary")
|
||||||
summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}"
|
summary_content = f"OneDrive File: {file_name}\n\n{indexable_content}"
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(indexable_content)
|
chunks = await create_document_chunks(indexable_content)
|
||||||
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
|
||||||
|
|
|
||||||
|
|
@ -670,7 +670,9 @@ async def index_discord_messages(
|
||||||
|
|
||||||
# Heavy processing (embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
chunks = await create_document_chunks(item["combined_document_string"])
|
chunks = await create_document_chunks(item["combined_document_string"])
|
||||||
doc_embedding = embed_text(item["combined_document_string"])
|
doc_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, item["combined_document_string"]
|
||||||
|
)
|
||||||
|
|
||||||
# Update document to READY with actual content
|
# Update document to READY with actual content
|
||||||
document.title = f"{item['guild_name']}#{item['channel_name']}"
|
document.title = f"{item['guild_name']}#{item['channel_name']}"
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ Implements 2-phase document status updates for real-time UI feedback:
|
||||||
- Phase 2: Process each event: pending → processing → ready/failed
|
- Phase 2: Process each event: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from datetime import datetime, timedelta
|
from datetime import datetime, timedelta
|
||||||
|
|
@ -465,7 +466,9 @@ async def index_luma_events(
|
||||||
summary_content = (
|
summary_content = (
|
||||||
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
|
f"Luma Event: {item['event_name']}\n\n{item['event_markdown']}"
|
||||||
)
|
)
|
||||||
summary_embedding = embed_text(summary_content)
|
summary_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, summary_content
|
||||||
|
)
|
||||||
|
|
||||||
chunks = await create_document_chunks(item["event_markdown"])
|
chunks = await create_document_chunks(item["event_markdown"])
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,7 @@ Uses 2-phase document status updates for real-time UI feedback:
|
||||||
- Phase 2: Process each document: pending → processing → ready/failed
|
- Phase 2: Process each document: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import time
|
import time
|
||||||
from collections.abc import Awaitable, Callable
|
from collections.abc import Awaitable, Callable
|
||||||
from datetime import UTC, datetime
|
from datetime import UTC, datetime
|
||||||
|
|
@ -581,7 +582,9 @@ async def index_teams_messages(
|
||||||
|
|
||||||
# Heavy processing (embeddings, chunks)
|
# Heavy processing (embeddings, chunks)
|
||||||
chunks = await create_document_chunks(item["combined_document_string"])
|
chunks = await create_document_chunks(item["combined_document_string"])
|
||||||
doc_embedding = embed_text(item["combined_document_string"])
|
doc_embedding = await asyncio.to_thread(
|
||||||
|
embed_text, item["combined_document_string"]
|
||||||
|
)
|
||||||
|
|
||||||
# Update document to READY with actual content
|
# Update document to READY with actual content
|
||||||
document.title = f"{item['team_name']} - {item['channel_name']}"
|
document.title = f"{item['team_name']} - {item['channel_name']}"
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
Unified document save/update logic for file processors.
|
Unified document save/update logic for file processors.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import asyncio
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from sqlalchemy.exc import SQLAlchemyError
|
from sqlalchemy.exc import SQLAlchemyError
|
||||||
|
|
@ -43,7 +44,7 @@ async def _generate_summary(
|
||||||
"""
|
"""
|
||||||
if not enable_summary:
|
if not enable_summary:
|
||||||
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
|
summary = f"File: {file_name}\n\n{markdown_content[:4000]}"
|
||||||
return summary, embed_text(summary)
|
return summary, await asyncio.to_thread(embed_text, summary)
|
||||||
|
|
||||||
if etl_service == "DOCLING":
|
if etl_service == "DOCLING":
|
||||||
from app.services.docling_service import create_docling_service
|
from app.services.docling_service import create_docling_service
|
||||||
|
|
@ -65,7 +66,7 @@ async def _generate_summary(
|
||||||
parts.append(f"**{formatted_key}:** {value}")
|
parts.append(f"**{formatted_key}:** {value}")
|
||||||
|
|
||||||
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
|
enhanced = "\n".join(parts) + "\n\n# DOCUMENT SUMMARY\n\n" + summary_text
|
||||||
return enhanced, embed_text(enhanced)
|
return enhanced, await asyncio.to_thread(embed_text, enhanced)
|
||||||
|
|
||||||
# Standard summary (Unstructured / LlamaCloud / others)
|
# Standard summary (Unstructured / LlamaCloud / others)
|
||||||
meta = {
|
meta = {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue