feat: added elasticsearch connector

This commit is contained in:
Anish Sarkar 2025-10-12 09:39:04 +05:30
parent 402039f02f
commit 55d752e3c8
27 changed files with 4331 additions and 2499 deletions

View file

@ -17,6 +17,7 @@ Available indexers:
- Google Gmail: Index messages from Google Gmail
- Google Calendar: Index events from Google Calendar
- Luma: Index events from Luma
- Elasticsearch: Index documents from Elasticsearch instances
"""
# Communication platforms
@ -27,6 +28,7 @@ from .confluence_indexer import index_confluence_pages
from .discord_indexer import index_discord_messages
# Development platforms
from .elasticsearch_indexer import index_elasticsearch_documents
from .github_indexer import index_github_repos
from .google_calendar_indexer import index_google_calendar_events
from .google_gmail_indexer import index_google_gmail_messages
@ -46,6 +48,7 @@ __all__ = [ # noqa: RUF022
"index_confluence_pages",
"index_discord_messages",
# Development platforms
"index_elasticsearch_documents",
"index_github_repos",
# Calendar and scheduling
"index_google_calendar_events",

View file

@ -0,0 +1,354 @@
"""
Elasticsearch indexer for SurfSense
"""
import hashlib
import json
import logging
from datetime import UTC, datetime
from typing import Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.future import select
from app.connectors.elasticsearch_connector import ElasticsearchConnector
from app.db import Document, DocumentType, SearchSourceConnector
logger = logging.getLogger(__name__)
class _ChunkingService:
def __init__(self, chunk_size: int = 1000, overlap: int = 200) -> None:
self.chunk_size = max(100, chunk_size)
self.overlap = max(0, min(overlap, self.chunk_size - 1))
def chunk_text(self, text: str) -> list[str]:
if not text:
return []
text = text.strip()
if len(text) <= self.chunk_size:
return [text]
chunks: list[str] = []
step = self.chunk_size - self.overlap
pos = 0
while pos < len(text):
end = pos + self.chunk_size
chunks.append(text[pos:end].strip())
pos += step
return chunks
class _DocumentService:
def __init__(self, session):
self.session = session
async def get_document_by_hash(self, content_hash: str):
from sqlalchemy.future import select
from app.db import Document
if not content_hash:
return None
result = await self.session.execute(
select(Document).where(Document.content_hash == content_hash)
)
return result.scalars().first()
async def create_chunks_for_document(self, document_id: int, chunks: list[str]):
from app.db import Chunk
for chunk_text in chunks:
self.session.add(Chunk(content=chunk_text, document_id=document_id))
await self.session.flush()
async def index_elasticsearch_documents(
session: AsyncSession,
connector_id: int,
search_space_id: int,
user_id: str,
start_date: str,
end_date: str,
update_last_indexed: bool = True,
) -> tuple[int, str | None]:
"""
Index documents from Elasticsearch into SurfSense
Args:
session: Database session
connector_id: Elasticsearch connector ID
search_space_id: Search space ID
user_id: User ID
start_date: Start date for indexing (not used for Elasticsearch, kept for compatibility)
end_date: End date for indexing (not used for Elasticsearch, kept for compatibility)
update_last_indexed: Whether to update the last indexed timestamp
Returns:
Tuple of (number of documents processed, error message if any)
"""
es_connector = None
try:
# Get the connector configuration
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.id == connector_id
)
)
connector = result.scalars().first()
if not connector:
error_msg = f"Elasticsearch connector with ID {connector_id} not found"
logger.error(error_msg)
return 0, error_msg
# Get connector configuration
config = connector.config
# Validate required fields - now only URL and INDEX are required
# Authentication can be either API key OR username/password
if "ELASTICSEARCH_URL" not in config:
error_msg = "Missing required field in connector config: ELASTICSEARCH_URL"
logger.error(error_msg)
return 0, error_msg
if "ELASTICSEARCH_INDEX" not in config:
error_msg = (
"Missing required field in connector config: ELASTICSEARCH_INDEX"
)
logger.error(error_msg)
return 0, error_msg
# Check authentication - must have either API key or username+password
has_api_key = (
"ELASTICSEARCH_API_KEY" in config and config["ELASTICSEARCH_API_KEY"]
)
has_basic_auth = (
"ELASTICSEARCH_USERNAME" in config
and config["ELASTICSEARCH_USERNAME"]
and "ELASTICSEARCH_PASSWORD" in config
and config["ELASTICSEARCH_PASSWORD"]
)
if not has_api_key and not has_basic_auth:
error_msg = "Missing authentication: provide either ELASTICSEARCH_API_KEY or ELASTICSEARCH_USERNAME + ELASTICSEARCH_PASSWORD"
logger.error(error_msg)
return 0, error_msg
# Initialize document service
document_service = _DocumentService(session)
chunking_service = _ChunkingService()
# Initialize Elasticsearch connector
es_connector = ElasticsearchConnector(
url=config["ELASTICSEARCH_URL"],
api_key=config.get("ELASTICSEARCH_API_KEY"),
username=config.get("ELASTICSEARCH_USERNAME"),
password=config.get("ELASTICSEARCH_PASSWORD"),
verify_certs=config.get("ELASTICSEARCH_VERIFY_CERTS", True),
ca_certs=config.get("ELASTICSEARCH_CA_CERTS"),
)
# Build query based on configuration
query = _build_elasticsearch_query(config)
# Get the index name(s) - can be a string or list
index_name = config["ELASTICSEARCH_INDEX"]
# Get max documents to index
max_documents = config.get("ELASTICSEARCH_MAX_DOCUMENTS", 1000)
logger.info(
f"Starting Elasticsearch indexing for index '{index_name}' with max {max_documents} documents"
)
documents_processed = 0
try:
# Use scroll search for large result sets
async for hit in es_connector.scroll_search(
index=index_name,
query=query,
size=min(max_documents, 100), # Scroll in batches
fields=config.get("ELASTICSEARCH_FIELDS"),
):
if documents_processed >= max_documents:
break
try:
# Extract document data
doc_id = hit["_id"]
source = hit.get("_source", {})
# Build document title
title_field = config.get("ELASTICSEARCH_TITLE_FIELD")
if not title_field:
for candidate in ("title", "name", "subject"):
if candidate in source:
title_field = candidate
break
title = (
str(source.get(title_field, doc_id))
if title_field is not None
else str(doc_id)
)
# Build document content
content = _build_document_content(source, config)
if not content.strip():
logger.warning(f"Skipping document {doc_id} - no content found")
continue
# Create content hash
content_hash = hashlib.sha256(content.encode()).hexdigest()
# Build metadata
metadata = {
"elasticsearch_id": doc_id,
"elasticsearch_index": hit.get("_index", index_name),
"elasticsearch_score": hit.get("_score"),
"indexed_at": datetime.now().isoformat(),
"source": "ELASTICSEARCH_CONNECTOR",
}
# Add any additional metadata fields specified in config
if "ELASTICSEARCH_METADATA_FIELDS" in config:
for field in config["ELASTICSEARCH_METADATA_FIELDS"]:
if field in source:
metadata[f"es_{field}"] = source[field]
# Check if document already exists
existing_doc = await document_service.get_document_by_hash(
content_hash
)
if existing_doc:
logger.debug(f"Document {doc_id} already exists, skipping")
continue
# Create document
document = Document(
title=title,
content=content,
content_hash=content_hash,
document_type=DocumentType.ELASTICSEARCH_CONNECTOR,
document_metadata=metadata,
search_space_id=search_space_id,
)
# Add document to session
session.add(document)
await session.flush() # Get the document ID
# Create chunks
chunks = chunking_service.chunk_text(content)
await document_service.create_chunks_for_document(
document.id, chunks
)
documents_processed += 1
if documents_processed % 10 == 0:
logger.info(
f"Processed {documents_processed} Elasticsearch documents"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing Elasticsearch document {hit.get('_id', 'unknown')}: {e}"
)
continue
# Final commit
await session.commit()
logger.info(
f"Successfully indexed {documents_processed} documents from Elasticsearch"
)
# Update last indexed timestamp if requested
if update_last_indexed and documents_processed > 0:
connector.last_indexed_at = datetime.now()
await session.commit()
if update_last_indexed and documents_processed > 0:
# store ISO-8601 UTC timestamp with 'Z' suffix, e.g. 2025-10-09T22:04:53.599658Z
connector.last_indexed_at = (
datetime.now(UTC).isoformat().replace("+00:00", "Z")
)
await session.commit()
return documents_processed, None
finally:
# Clean up Elasticsearch connection
if es_connector:
await es_connector.close()
except Exception as e:
error_msg = f"Error indexing Elasticsearch documents: {e}"
logger.error(error_msg, exc_info=True)
await session.rollback()
if es_connector:
await es_connector.close()
return 0, error_msg
def _build_elasticsearch_query(config: dict[str, Any]) -> dict[str, Any]:
"""
Build Elasticsearch query from connector configuration
Args:
config: Connector configuration
Returns:
Elasticsearch query DSL
"""
# Check if custom query is provided
if config.get("ELASTICSEARCH_QUERY"):
try:
if isinstance(config["ELASTICSEARCH_QUERY"], str):
return json.loads(config["ELASTICSEARCH_QUERY"])
else:
return config["ELASTICSEARCH_QUERY"]
except (json.JSONDecodeError, TypeError) as e:
logger.warning(f"Invalid custom query, using match_all: {e}")
# Default to match all documents
return {"match_all": {}}
def _build_document_content(source: dict[str, Any], config: dict[str, Any]) -> str:
"""
Build document content from Elasticsearch document source
Args:
source: Elasticsearch document source
config: Connector configuration
Returns:
Formatted document content
"""
content_parts = []
# Get content fields from config
content_fields = config.get("ELASTICSEARCH_CONTENT_FIELDS", [])
if content_fields:
# Use specified content fields
for field in content_fields:
if field in source:
field_value = source[field]
if isinstance(field_value, str | int | float):
content_parts.append(f"{field}: {field_value}")
if isinstance(field_value, str | int | float):
content_parts.append(f"{field}: {json.dumps(field_value)}")
else:
# Use all fields if no specific content fields specified
for key, value in source.items():
if isinstance(value, str | int | float):
content_parts.append(f"{key}: {value}")
elif isinstance(value, list | dict):
content_parts.append(f"{key}: {json.dumps(value)}")
return "\n".join(content_parts)