From 98870a9f9a0a38cf31d02259ca36475d461db118 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sat, 7 Feb 2026 18:26:29 +0530 Subject: [PATCH] feat: implement batch indexing for Discord messages to improve efficiency and context --- .../connector_indexers/discord_indexer.py | 218 +++++++++++++----- 1 file changed, 159 insertions(+), 59 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 1595897a0..98b798452 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -1,7 +1,10 @@ """ Discord connector indexer. -Implements 2-phase document status updates for real-time UI feedback: +Implements batch indexing: groups up to DISCORD_BATCH_SIZE messages per channel +into a single document for efficient indexing and better conversational context. + +Uses 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed """ @@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]] # Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +# Number of messages to combine into a single document for batch indexing. +# Grouping messages improves conversational context in embeddings/chunks and +# drastically reduces the number of documents, embedding calls, and DB overhead. +DISCORD_BATCH_SIZE = 100 + + +def _build_batch_document_string( + guild_name: str, + guild_id: str, + channel_name: str, + channel_id: str, + messages: list[dict], +) -> str: + """ + Combine multiple Discord messages into a single document string. + + Each message is formatted with its timestamp and author, and all messages + are concatenated into a conversation-style document. The chunker will + later split this into overlapping windows of ~8-10 consecutive messages, + preserving conversational context in each chunk's embedding. + + Args: + guild_name: Name of the Discord guild + guild_id: ID of the Discord guild + channel_name: Name of the channel + channel_id: ID of the channel + messages: List of message dicts with 'author_name', 'created_at', 'content' + + Returns: + Formatted document string with metadata and conversation content + """ + first_msg_time = messages[0].get("created_at", "Unknown") + last_msg_time = messages[-1].get("created_at", "Unknown") + + metadata_lines = [ + f"GUILD_NAME: {guild_name}", + f"GUILD_ID: {guild_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(messages)}", + f"FIRST_MESSAGE_TIME: {first_msg_time}", + f"LAST_MESSAGE_TIME: {last_msg_time}", + ] + + conversation_lines = [] + for msg in messages: + author = msg.get("author_name", "Unknown User") + timestamp = msg.get("created_at", "Unknown Time") + content = msg.get("content", "") + conversation_lines.append(f"[{timestamp}] {author}: {content}") + + metadata_sections = [ + ("METADATA", metadata_lines), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + "\n".join(conversation_lines), + "TEXT_END", + ], + ), + ] + + return build_document_metadata_markdown(metadata_sections) + async def index_discord_messages( session: AsyncSession, @@ -55,6 +124,12 @@ async def index_discord_messages( """ Index Discord messages from the configured guild's channels. + Messages are grouped into batches of DISCORD_BATCH_SIZE per channel, + so each document contains up to 100 consecutive messages with full + conversational context. This reduces document count, embedding calls, + and DB overhead by ~100x while improving search quality through + context-aware chunk embeddings. + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed @@ -324,6 +399,7 @@ async def index_discord_messages( documents_skipped = 0 documents_failed = 0 duplicate_content_count = 0 + total_messages_collected = 0 skipped_channels: list[str] = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -340,10 +416,12 @@ async def index_discord_messages( ) # ======================================================================= - # PHASE 1: Collect all messages and create pending documents - # This makes ALL documents visible in the UI immediately with pending status + # PHASE 1: Collect messages, group into batches, and create pending documents + # Messages are grouped into batches of DISCORD_BATCH_SIZE per channel. + # Each batch becomes a single document with full conversational context. + # All documents are visible in the UI immediately with pending status. # ======================================================================= - messages_to_process = [] # List of dicts with document and message data + batches_to_process = [] # List of dicts with document and batch data new_documents_created = False try: @@ -394,44 +472,35 @@ async def index_discord_messages( ) continue - # Process each message as an individual document (like Slack) - for msg in formatted_messages: - msg_id = msg.get("id", "") - msg_user_name = msg.get("author_name", "Unknown User") - msg_timestamp = msg.get("created_at", "Unknown Time") - msg_text = msg.get("content", "") + total_messages_collected += len(formatted_messages) - # Format document metadata (similar to Slack) - metadata_sections = [ - ( - "METADATA", - [ - f"GUILD_NAME: {guild_name}", - f"GUILD_ID: {guild_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {msg_timestamp}", - f"MESSAGE_USER_NAME: {msg_user_name}", - ], - ), - ( - "CONTENT", - [ - "FORMAT: markdown", - "TEXT_START", - msg_text, - "TEXT_END", - ], - ), + # ======================================================= + # Group messages into batches of DISCORD_BATCH_SIZE + # Each batch becomes a single document with conversation context + # ======================================================= + for batch_start in range( + 0, len(formatted_messages), DISCORD_BATCH_SIZE + ): + batch = formatted_messages[ + batch_start : batch_start + DISCORD_BATCH_SIZE ] - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections + # Build combined document string from all messages in this batch + combined_document_string = _build_batch_document_string( + guild_name=guild_name, + guild_id=guild_id, + channel_name=channel_name, + channel_id=channel_id, + messages=batch, ) - # Generate unique identifier hash for this Discord message - unique_identifier = f"{channel_id}_{msg_id}" + # Generate unique identifier for this batch using + # channel_id + first message ID + last message ID + first_msg_id = batch[0].get("id", "") + last_msg_id = batch[-1].get("id", "") + unique_identifier = ( + f"{channel_id}_{first_msg_id}_{last_msg_id}" + ) unique_identifier_hash = generate_unique_identifier_hash( DocumentType.DISCORD_CONNECTOR, unique_identifier, @@ -464,7 +533,7 @@ async def index_discord_messages( continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( + batches_to_process.append( { "document": existing_document, "is_new": False, @@ -474,9 +543,15 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_at", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_at", "Unknown" + ), + "message_count": len(batch), } ) continue @@ -492,7 +567,7 @@ async def index_discord_messages( if duplicate_by_content: logger.info( - f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"Discord batch ({len(batch)} msgs) in {guild_name}#{channel_name} already indexed by another connector " f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) @@ -510,7 +585,9 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "message_count": len(batch), "connector_id": connector_id, }, content="Pending...", # Placeholder until processed @@ -526,7 +603,7 @@ async def index_discord_messages( session.add(document) new_documents_created = True - messages_to_process.append( + batches_to_process.append( { "document": document, "is_new": True, @@ -536,12 +613,23 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_at", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_at", "Unknown" + ), + "message_count": len(batch), } ) + logger.info( + f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, " + f"grouped into {(len(formatted_messages) + DISCORD_BATCH_SIZE - 1) // DISCORD_BATCH_SIZE} batch(es)" + ) + except Exception as e: logger.error( f"Error processing guild {guild_name}: {e!s}", exc_info=True @@ -554,17 +642,20 @@ async def index_discord_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents " + f"({total_messages_collected} total messages across all channels)" ) await session.commit() # ======================================================================= - # PHASE 2: Process each document one by one + # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + logger.info( + f"Phase 2: Processing {len(batches_to_process)} batch documents" + ) - for item in messages_to_process: + for item in batches_to_process: # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() @@ -594,9 +685,11 @@ async def index_discord_messages( "guild_id": item["guild_id"], "channel_name": item["channel_name"], "channel_id": item["channel_id"], - "message_id": item["message_id"], - "message_timestamp": item["message_timestamp"], - "message_user_name": item["message_user_name"], + "first_message_id": item["first_message_id"], + "last_message_id": item["last_message_id"], + "first_message_time": item["first_message_time"], + "last_message_time": item["last_message_time"], + "message_count": item["message_count"], "indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -609,12 +702,14 @@ async def index_discord_messages( # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - f"Committing batch: {documents_indexed} Discord messages processed so far" + f"Committing batch: {documents_indexed} batch documents processed so far" ) await session.commit() except Exception as e: - logger.error(f"Error processing Discord message: {e!s}", exc_info=True) + logger.error( + f"Error processing Discord batch document: {e!s}", exc_info=True + ) # Mark document as failed with reason (visible in UI) try: document.status = DocumentStatus.failed(str(e)) @@ -631,7 +726,8 @@ async def index_discord_messages( # Final commit for any remaining documents not yet committed in batches logger.info( - f"Final commit: Total {documents_indexed} Discord messages processed" + f"Final commit: Total {documents_indexed} batch documents processed " + f"(from {total_messages_collected} messages)" ) try: await session.commit() @@ -672,14 +768,18 @@ async def index_discord_messages( "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), + "total_messages_collected": total_messages_collected, + "batch_size": DISCORD_BATCH_SIZE, "guild_id": guild_id, "guild_name": guild_name, }, ) logger.info( - f"Discord indexing completed for guild {guild_name}: {documents_indexed} ready, {documents_skipped} skipped, " - f"{documents_failed} failed ({duplicate_content_count} duplicate content)" + f"Discord indexing completed for guild {guild_name}: " + f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return documents_indexed, warning_message