From 82438c739642ce77ad9d0b87db6b2f65937a1f14 Mon Sep 17 00:00:00 2001
From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com>
Date: Thu, 16 Oct 2025 17:48:28 +0530
Subject: [PATCH] refactor: streamline Elasticsearch indexing by removing
unused services and integrating document chunking, also added documentation
---
README.md | 2 +-
.../elasticsearch_indexer.py | 71 ++++---------------
2 files changed, 14 insertions(+), 59 deletions(-)
diff --git a/README.md b/README.md
index 52caafc30..5cd73d0c3 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@
# SurfSense
-While tools like NotebookLM and Perplexity are impressive and highly effective for conducting research on any topic/query, SurfSense elevates this capability by integrating with your personal knowledge base. It is a highly customizable AI research agent, connected to external sources such as Search Engines (SearxNG, Tavily, LinkUp), Slack, Linear, Jira, ClickUp, Confluence, Gmail, Notion, YouTube, GitHub, Discord, Airtable, Google Calendar, Luma and more to come.
+While tools like NotebookLM and Perplexity are impressive and highly effective for conducting research on any topic/query, SurfSense elevates this capability by integrating with your personal knowledge base. It is a highly customizable AI research agent, connected to external sources such as Search Engines (SearxNG, Tavily, LinkUp), Slack, Linear, Jira, ClickUp, Confluence, Gmail, Notion, YouTube, GitHub, Discord, Airtable, Google Calendar, Luma, Elasticsearch and more to come.

diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py
index 53e9d61ad..ba81a53f4 100644
--- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py
+++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py
@@ -13,55 +13,12 @@ from sqlalchemy.future import select
from app.connectors.elasticsearch_connector import ElasticsearchConnector
from app.db import Document, DocumentType, SearchSourceConnector
+from app.utils.document_converters import create_document_chunks
+
+from .base import check_document_by_unique_identifier
logger = logging.getLogger(__name__)
-
-class _ChunkingService:
- def __init__(self, chunk_size: int = 1000, overlap: int = 200) -> None:
- self.chunk_size = max(100, chunk_size)
- self.overlap = max(0, min(overlap, self.chunk_size - 1))
-
- def chunk_text(self, text: str) -> list[str]:
- if not text:
- return []
- text = text.strip()
- if len(text) <= self.chunk_size:
- return [text]
- chunks: list[str] = []
- step = self.chunk_size - self.overlap
- pos = 0
- while pos < len(text):
- end = pos + self.chunk_size
- chunks.append(text[pos:end].strip())
- pos += step
- return chunks
-
-
-class _DocumentService:
- def __init__(self, session):
- self.session = session
-
- async def get_document_by_hash(self, content_hash: str):
- from sqlalchemy.future import select
-
- from app.db import Document
-
- if not content_hash:
- return None
- result = await self.session.execute(
- select(Document).where(Document.content_hash == content_hash)
- )
- return result.scalars().first()
-
- async def create_chunks_for_document(self, document_id: int, chunks: list[str]):
- from app.db import Chunk
-
- for chunk_text in chunks:
- self.session.add(Chunk(content=chunk_text, document_id=document_id))
- await self.session.flush()
-
-
async def index_elasticsearch_documents(
session: AsyncSession,
connector_id: int,
@@ -136,8 +93,7 @@ async def index_elasticsearch_documents(
return 0, error_msg
# Initialize document service
- document_service = _DocumentService(session)
- chunking_service = _ChunkingService()
+ # document_service = _DocumentService(session)
# Initialize Elasticsearch connector
es_connector = ElasticsearchConnector(
@@ -216,8 +172,11 @@ async def index_elasticsearch_documents(
metadata[f"es_{field}"] = source[field]
# Check if document already exists
- existing_doc = await document_service.get_document_by_hash(
- content_hash
+ existing_doc = await check_document_by_unique_identifier(
+ session,
+ DocumentType.ELASTICSEARCH_CONNECTOR,
+ content_hash,
+ search_space_id,
)
if existing_doc:
@@ -234,15 +193,11 @@ async def index_elasticsearch_documents(
search_space_id=search_space_id,
)
- # Add document to session
+ # Create chunks and attach to document (persist via relationship)
+ chunks = await create_document_chunks(content)
+ document.chunks = chunks
session.add(document)
- await session.flush() # Get the document ID
-
- # Create chunks
- chunks = chunking_service.chunk_text(content)
- await document_service.create_chunks_for_document(
- document.id, chunks
- )
+ await session.flush()
documents_processed += 1