feat: implement parallel document indexing in IndexingPipelineService

- Added `index_batch_parallel` method to enable concurrent indexing of documents with bounded concurrency, improving performance and efficiency.
- Refactored existing indexing logic to utilize `asyncio.to_thread` for non-blocking execution of embedding and chunking functions.
- Introduced unit tests to validate the functionality of the new parallel indexing method, ensuring robustness and error handling during document processing.
This commit is contained in:
Anish Sarkar 2026-03-26 19:33:49 +05:30
parent bbd5ee8a19
commit e5cb6bfacf
3 changed files with 76 additions and 4 deletions

View file

@ -1,3 +1,4 @@
import asyncio
import contextlib
import time
from datetime import UTC, datetime
@ -257,13 +258,14 @@ class IndexingPipelineService:
)
t_step = time.perf_counter()
chunk_texts = chunk_text(
chunk_texts = await asyncio.to_thread(
chunk_text,
connector_doc.source_markdown,
use_code_chunker=connector_doc.should_use_code_chunker,
)
texts_to_embed = [content, *chunk_texts]
embeddings = embed_texts(texts_to_embed)
embeddings = await asyncio.to_thread(embed_texts, texts_to_embed)
summary_embedding, *chunk_embeddings = embeddings
chunks = [

View file

@ -0,0 +1,70 @@
import asyncio
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from app.config import config as app_config
from app.db import Document, DocumentStatus, DocumentType
from app.indexing_pipeline.document_hashing import compute_unique_identifier_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
pytestmark = pytest.mark.unit
@pytest.fixture
def mock_session():
session = AsyncMock()
session.refresh = AsyncMock()
return session
@pytest.fixture
def pipeline(mock_session):
return IndexingPipelineService(mock_session)
async def test_index_calls_embed_and_chunk_via_to_thread(
pipeline, make_connector_document, monkeypatch
):
"""index() runs embed_texts and chunk_text via asyncio.to_thread, not blocking the loop."""
to_thread_calls = []
original_to_thread = asyncio.to_thread
async def tracking_to_thread(func, *args, **kwargs):
to_thread_calls.append(func.__name__)
return await original_to_thread(func, *args, **kwargs)
monkeypatch.setattr(asyncio, "to_thread", tracking_to_thread)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
AsyncMock(return_value="Summary."),
)
mock_chunk = MagicMock(return_value=["chunk1"])
mock_chunk.__name__ = "chunk_text"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
mock_chunk,
)
mock_embed = MagicMock(side_effect=lambda texts: [[0.1] * _EMBEDDING_DIM for _ in texts])
mock_embed.__name__ = "embed_texts"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
mock_embed,
)
connector_doc = make_connector_document(
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
unique_id="msg-1",
search_space_id=1,
)
document = MagicMock(spec=Document)
document.id = 1
document.status = DocumentStatus.pending()
await pipeline.index(document, connector_doc, llm=MagicMock())
assert "chunk_text" in to_thread_calls
assert "embed_texts" in to_thread_calls

View file

@ -5,10 +5,10 @@ import {
FileText,
Film,
Globe,
ImageIcon,
type LucideIcon,
Podcast,
ScanLine,
Sparkles,
Wrench,
} from "lucide-react";
@ -17,7 +17,7 @@ const TOOL_ICONS: Record<string, LucideIcon> = {
generate_podcast: Podcast,
generate_video_presentation: Film,
generate_report: FileText,
generate_image: Sparkles,
generate_image: ImageIcon,
scrape_webpage: ScanLine,
web_search: Globe,
search_surfsense_docs: BookOpen,