refactor: streamline Elasticsearch indexing by removing unused services and integrating document chunking, also added documentation

This commit is contained in:
Anish Sarkar 2025-10-16 17:48:28 +05:30
parent 3c0f5e42f3
commit 82438c7396
2 changed files with 14 additions and 59 deletions

View file

@ -10,7 +10,7 @@
# SurfSense
While tools like NotebookLM and Perplexity are impressive and highly effective for conducting research on any topic/query, SurfSense elevates this capability by integrating with your personal knowledge base. It is a highly customizable AI research agent, connected to external sources such as Search Engines (SearxNG, Tavily, LinkUp), Slack, Linear, Jira, ClickUp, Confluence, Gmail, Notion, YouTube, GitHub, Discord, Airtable, Google Calendar, Luma and more to come.
While tools like NotebookLM and Perplexity are impressive and highly effective for conducting research on any topic/query, SurfSense elevates this capability by integrating with your personal knowledge base. It is a highly customizable AI research agent, connected to external sources such as Search Engines (SearxNG, Tavily, LinkUp), Slack, Linear, Jira, ClickUp, Confluence, Gmail, Notion, YouTube, GitHub, Discord, Airtable, Google Calendar, Luma, Elasticsearch and more to come.
<div align="center">
<a href="https://trendshift.io/repositories/13606" target="_blank"><img src="https://trendshift.io/api/badge/repositories/13606" alt="MODSetter%2FSurfSense | Trendshift" style="width: 250px; height: 55px;" width="250" height="55"/></a>

View file

@ -13,55 +13,12 @@ from sqlalchemy.future import select
from app.connectors.elasticsearch_connector import ElasticsearchConnector
from app.db import Document, DocumentType, SearchSourceConnector
from app.utils.document_converters import create_document_chunks
from .base import check_document_by_unique_identifier
logger = logging.getLogger(__name__)
class _ChunkingService:
def __init__(self, chunk_size: int = 1000, overlap: int = 200) -> None:
self.chunk_size = max(100, chunk_size)
self.overlap = max(0, min(overlap, self.chunk_size - 1))
def chunk_text(self, text: str) -> list[str]:
if not text:
return []
text = text.strip()
if len(text) <= self.chunk_size:
return [text]
chunks: list[str] = []
step = self.chunk_size - self.overlap
pos = 0
while pos < len(text):
end = pos + self.chunk_size
chunks.append(text[pos:end].strip())
pos += step
return chunks
class _DocumentService:
def __init__(self, session):
self.session = session
async def get_document_by_hash(self, content_hash: str):
from sqlalchemy.future import select
from app.db import Document
if not content_hash:
return None
result = await self.session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return result.scalars().first()
async def create_chunks_for_document(self, document_id: int, chunks: list[str]):
from app.db import Chunk
for chunk_text in chunks:
self.session.add(Chunk(content=chunk_text, document_id=document_id))
await self.session.flush()
async def index_elasticsearch_documents(
session: AsyncSession,
connector_id: int,
@ -136,8 +93,7 @@ async def index_elasticsearch_documents(
return 0, error_msg
# Initialize document service
document_service = _DocumentService(session)
chunking_service = _ChunkingService()
# document_service = _DocumentService(session)
# Initialize Elasticsearch connector
es_connector = ElasticsearchConnector(
@ -216,8 +172,11 @@ async def index_elasticsearch_documents(
metadata[f"es_{field}"] = source[field]
# Check if document already exists
existing_doc = await document_service.get_document_by_hash(
content_hash
existing_doc = await check_document_by_unique_identifier(
session,
DocumentType.ELASTICSEARCH_CONNECTOR,
content_hash,
search_space_id,
)
if existing_doc:
@ -234,15 +193,11 @@ async def index_elasticsearch_documents(
search_space_id=search_space_id,
)
# Add document to session
# Create chunks and attach to document (persist via relationship)
chunks = await create_document_chunks(content)
document.chunks = chunks
session.add(document)
await session.flush() # Get the document ID
# Create chunks
chunks = chunking_service.chunk_text(content)
await document_service.create_chunks_for_document(
document.id, chunks
)
await session.flush()
documents_processed += 1