From 70cc8b44c8eafa3d8bec62b55b85c89d0b745e20 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sat, 7 Feb 2026 16:18:11 +0530 Subject: [PATCH 1/9] refactor: simplify LocalLoginForm and enhance RowActions logic for document editing --- surfsense_web/app/(home)/login/LocalLoginForm.tsx | 9 +-------- .../documents/(manage)/components/RowActions.tsx | 8 ++++++-- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/surfsense_web/app/(home)/login/LocalLoginForm.tsx b/surfsense_web/app/(home)/login/LocalLoginForm.tsx index 5b2edae71..21928708c 100644 --- a/surfsense_web/app/(home)/login/LocalLoginForm.tsx +++ b/surfsense_web/app/(home)/login/LocalLoginForm.tsx @@ -16,7 +16,6 @@ import { trackLoginAttempt, trackLoginFailure, trackLoginSuccess } from "@/lib/p export function LocalLoginForm() { const t = useTranslations("auth"); - const tCommon = useTranslations("common"); const [username, setUsername] = useState(""); const [password, setPassword] = useState(""); const [showPassword, setShowPassword] = useState(false); @@ -58,12 +57,6 @@ export function LocalLoginForm() { sessionStorage.setItem("login_success_tracked", "true"); } - // Success toast - toast.success(t("login_success"), { - description: "Redirecting to dashboard", - duration: 2000, - }); - // Small delay to show success message setTimeout(() => { router.push(`/auth/callback?token=${data.access_token}`); @@ -123,7 +116,7 @@ export function LocalLoginForm() {
{/* Error Display */} - {error && error.title && ( + {error?.title && ( { From 98870a9f9a0a38cf31d02259ca36475d461db118 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sat, 7 Feb 2026 18:26:29 +0530 Subject: [PATCH 2/9] feat: implement batch indexing for Discord messages to improve efficiency and context --- .../connector_indexers/discord_indexer.py | 218 +++++++++++++----- 1 file changed, 159 insertions(+), 59 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 1595897a0..98b798452 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -1,7 +1,10 @@ """ Discord connector indexer. -Implements 2-phase document status updates for real-time UI feedback: +Implements batch indexing: groups up to DISCORD_BATCH_SIZE messages per channel +into a single document for efficient indexing and better conversational context. + +Uses 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed """ @@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]] # Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +# Number of messages to combine into a single document for batch indexing. +# Grouping messages improves conversational context in embeddings/chunks and +# drastically reduces the number of documents, embedding calls, and DB overhead. +DISCORD_BATCH_SIZE = 100 + + +def _build_batch_document_string( + guild_name: str, + guild_id: str, + channel_name: str, + channel_id: str, + messages: list[dict], +) -> str: + """ + Combine multiple Discord messages into a single document string. + + Each message is formatted with its timestamp and author, and all messages + are concatenated into a conversation-style document. The chunker will + later split this into overlapping windows of ~8-10 consecutive messages, + preserving conversational context in each chunk's embedding. + + Args: + guild_name: Name of the Discord guild + guild_id: ID of the Discord guild + channel_name: Name of the channel + channel_id: ID of the channel + messages: List of message dicts with 'author_name', 'created_at', 'content' + + Returns: + Formatted document string with metadata and conversation content + """ + first_msg_time = messages[0].get("created_at", "Unknown") + last_msg_time = messages[-1].get("created_at", "Unknown") + + metadata_lines = [ + f"GUILD_NAME: {guild_name}", + f"GUILD_ID: {guild_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(messages)}", + f"FIRST_MESSAGE_TIME: {first_msg_time}", + f"LAST_MESSAGE_TIME: {last_msg_time}", + ] + + conversation_lines = [] + for msg in messages: + author = msg.get("author_name", "Unknown User") + timestamp = msg.get("created_at", "Unknown Time") + content = msg.get("content", "") + conversation_lines.append(f"[{timestamp}] {author}: {content}") + + metadata_sections = [ + ("METADATA", metadata_lines), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + "\n".join(conversation_lines), + "TEXT_END", + ], + ), + ] + + return build_document_metadata_markdown(metadata_sections) + async def index_discord_messages( session: AsyncSession, @@ -55,6 +124,12 @@ async def index_discord_messages( """ Index Discord messages from the configured guild's channels. + Messages are grouped into batches of DISCORD_BATCH_SIZE per channel, + so each document contains up to 100 consecutive messages with full + conversational context. This reduces document count, embedding calls, + and DB overhead by ~100x while improving search quality through + context-aware chunk embeddings. + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed @@ -324,6 +399,7 @@ async def index_discord_messages( documents_skipped = 0 documents_failed = 0 duplicate_content_count = 0 + total_messages_collected = 0 skipped_channels: list[str] = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -340,10 +416,12 @@ async def index_discord_messages( ) # ======================================================================= - # PHASE 1: Collect all messages and create pending documents - # This makes ALL documents visible in the UI immediately with pending status + # PHASE 1: Collect messages, group into batches, and create pending documents + # Messages are grouped into batches of DISCORD_BATCH_SIZE per channel. + # Each batch becomes a single document with full conversational context. + # All documents are visible in the UI immediately with pending status. # ======================================================================= - messages_to_process = [] # List of dicts with document and message data + batches_to_process = [] # List of dicts with document and batch data new_documents_created = False try: @@ -394,44 +472,35 @@ async def index_discord_messages( ) continue - # Process each message as an individual document (like Slack) - for msg in formatted_messages: - msg_id = msg.get("id", "") - msg_user_name = msg.get("author_name", "Unknown User") - msg_timestamp = msg.get("created_at", "Unknown Time") - msg_text = msg.get("content", "") + total_messages_collected += len(formatted_messages) - # Format document metadata (similar to Slack) - metadata_sections = [ - ( - "METADATA", - [ - f"GUILD_NAME: {guild_name}", - f"GUILD_ID: {guild_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {msg_timestamp}", - f"MESSAGE_USER_NAME: {msg_user_name}", - ], - ), - ( - "CONTENT", - [ - "FORMAT: markdown", - "TEXT_START", - msg_text, - "TEXT_END", - ], - ), + # ======================================================= + # Group messages into batches of DISCORD_BATCH_SIZE + # Each batch becomes a single document with conversation context + # ======================================================= + for batch_start in range( + 0, len(formatted_messages), DISCORD_BATCH_SIZE + ): + batch = formatted_messages[ + batch_start : batch_start + DISCORD_BATCH_SIZE ] - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections + # Build combined document string from all messages in this batch + combined_document_string = _build_batch_document_string( + guild_name=guild_name, + guild_id=guild_id, + channel_name=channel_name, + channel_id=channel_id, + messages=batch, ) - # Generate unique identifier hash for this Discord message - unique_identifier = f"{channel_id}_{msg_id}" + # Generate unique identifier for this batch using + # channel_id + first message ID + last message ID + first_msg_id = batch[0].get("id", "") + last_msg_id = batch[-1].get("id", "") + unique_identifier = ( + f"{channel_id}_{first_msg_id}_{last_msg_id}" + ) unique_identifier_hash = generate_unique_identifier_hash( DocumentType.DISCORD_CONNECTOR, unique_identifier, @@ -464,7 +533,7 @@ async def index_discord_messages( continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( + batches_to_process.append( { "document": existing_document, "is_new": False, @@ -474,9 +543,15 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_at", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_at", "Unknown" + ), + "message_count": len(batch), } ) continue @@ -492,7 +567,7 @@ async def index_discord_messages( if duplicate_by_content: logger.info( - f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector " + f"Discord batch ({len(batch)} msgs) in {guild_name}#{channel_name} already indexed by another connector " f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) @@ -510,7 +585,9 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "message_count": len(batch), "connector_id": connector_id, }, content="Pending...", # Placeholder until processed @@ -526,7 +603,7 @@ async def index_discord_messages( session.add(document) new_documents_created = True - messages_to_process.append( + batches_to_process.append( { "document": document, "is_new": True, @@ -536,12 +613,23 @@ async def index_discord_messages( "guild_id": guild_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_at", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_at", "Unknown" + ), + "message_count": len(batch), } ) + logger.info( + f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, " + f"grouped into {(len(formatted_messages) + DISCORD_BATCH_SIZE - 1) // DISCORD_BATCH_SIZE} batch(es)" + ) + except Exception as e: logger.error( f"Error processing guild {guild_name}: {e!s}", exc_info=True @@ -554,17 +642,20 @@ async def index_discord_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents " + f"({total_messages_collected} total messages across all channels)" ) await session.commit() # ======================================================================= - # PHASE 2: Process each document one by one + # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + logger.info( + f"Phase 2: Processing {len(batches_to_process)} batch documents" + ) - for item in messages_to_process: + for item in batches_to_process: # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() @@ -594,9 +685,11 @@ async def index_discord_messages( "guild_id": item["guild_id"], "channel_name": item["channel_name"], "channel_id": item["channel_id"], - "message_id": item["message_id"], - "message_timestamp": item["message_timestamp"], - "message_user_name": item["message_user_name"], + "first_message_id": item["first_message_id"], + "last_message_id": item["last_message_id"], + "first_message_time": item["first_message_time"], + "last_message_time": item["last_message_time"], + "message_count": item["message_count"], "indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -609,12 +702,14 @@ async def index_discord_messages( # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - f"Committing batch: {documents_indexed} Discord messages processed so far" + f"Committing batch: {documents_indexed} batch documents processed so far" ) await session.commit() except Exception as e: - logger.error(f"Error processing Discord message: {e!s}", exc_info=True) + logger.error( + f"Error processing Discord batch document: {e!s}", exc_info=True + ) # Mark document as failed with reason (visible in UI) try: document.status = DocumentStatus.failed(str(e)) @@ -631,7 +726,8 @@ async def index_discord_messages( # Final commit for any remaining documents not yet committed in batches logger.info( - f"Final commit: Total {documents_indexed} Discord messages processed" + f"Final commit: Total {documents_indexed} batch documents processed " + f"(from {total_messages_collected} messages)" ) try: await session.commit() @@ -672,14 +768,18 @@ async def index_discord_messages( "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), + "total_messages_collected": total_messages_collected, + "batch_size": DISCORD_BATCH_SIZE, "guild_id": guild_id, "guild_name": guild_name, }, ) logger.info( - f"Discord indexing completed for guild {guild_name}: {documents_indexed} ready, {documents_skipped} skipped, " - f"{documents_failed} failed ({duplicate_content_count} duplicate content)" + f"Discord indexing completed for guild {guild_name}: " + f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return documents_indexed, warning_message From 7cede99d29c9c3508622a9ab85d3da6ebf6bb2c1 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sat, 7 Feb 2026 18:30:06 +0530 Subject: [PATCH 3/9] feat: implement batch indexing for Slack messages to enhance efficiency and conversational context --- .../tasks/connector_indexers/slack_indexer.py | 262 +++++++++++++----- 1 file changed, 200 insertions(+), 62 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index 111552fa6..decdc7b37 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -1,7 +1,10 @@ """ Slack connector indexer. -Implements 2-phase document status updates for real-time UI feedback: +Implements batch indexing: groups up to SLACK_BATCH_SIZE messages per channel +into a single document for efficient indexing and better conversational context. + +Uses 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed """ @@ -42,6 +45,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]] # Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +# Number of messages to combine into a single document for batch indexing. +# Grouping messages improves conversational context in embeddings/chunks and +# drastically reduces the number of documents, embedding calls, and DB overhead. +SLACK_BATCH_SIZE = 100 + + +def _build_batch_document_string( + team_name: str, + team_id: str, + channel_name: str, + channel_id: str, + messages: list[dict], +) -> str: + """ + Combine multiple Slack messages into a single document string. + + Each message is formatted with its timestamp and author, and all messages + are concatenated into a conversation-style document. The chunker will + later split this into overlapping windows of ~8-10 consecutive messages, + preserving conversational context in each chunk's embedding. + + Args: + team_name: Name of the Slack workspace + team_id: ID of the Slack workspace + channel_name: Name of the channel + channel_id: ID of the channel + messages: List of formatted message dicts with 'user_name', 'datetime', 'text' + + Returns: + Formatted document string with metadata and conversation content + """ + first_msg_time = messages[0].get("datetime", "Unknown") + last_msg_time = messages[-1].get("datetime", "Unknown") + + metadata_lines = [ + f"WORKSPACE_NAME: {team_name}", + f"WORKSPACE_ID: {team_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(messages)}", + f"FIRST_MESSAGE_TIME: {first_msg_time}", + f"LAST_MESSAGE_TIME: {last_msg_time}", + ] + + conversation_lines = [] + for msg in messages: + author = msg.get("user_name", "Unknown User") + timestamp = msg.get("datetime", "Unknown Time") + content = msg.get("text", "") + conversation_lines.append(f"[{timestamp}] {author}: {content}") + + metadata_sections = [ + ("METADATA", metadata_lines), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + "\n".join(conversation_lines), + "TEXT_END", + ], + ), + ] + + return build_document_metadata_markdown(metadata_sections) + async def index_slack_messages( session: AsyncSession, @@ -56,6 +125,16 @@ async def index_slack_messages( """ Index Slack messages from all accessible channels. + Messages are grouped into batches of SLACK_BATCH_SIZE per channel, + so each document contains up to 100 consecutive messages with full + conversational context. This reduces document count, embedding calls, + and DB overhead by ~100x while improving search quality through + context-aware chunk embeddings. + + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Create all documents with 'pending' status (visible in UI immediately) + - Phase 2: Process each document: pending → processing → ready/failed + Args: session: Database session connector_id: ID of the Slack connector @@ -109,6 +188,10 @@ async def index_slack_messages( f"Connector with ID {connector_id} not found or is not a Slack connector", ) + # Extract workspace info from connector config + team_id = connector.config.get("team_id", "") + team_name = connector.config.get("team_name", "Unknown Workspace") + # Note: Token handling is now done automatically by SlackHistory # with auto-refresh support. We just need to pass session and connector_id. @@ -182,6 +265,8 @@ async def index_slack_messages( documents_indexed = 0 documents_skipped = 0 documents_failed = 0 # Track messages that failed processing + duplicate_content_count = 0 + total_messages_collected = 0 skipped_channels = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -194,10 +279,12 @@ async def index_slack_messages( ) # ======================================================================= - # PHASE 1: Collect all messages from all channels, create pending documents - # This makes ALL documents visible in the UI immediately with pending status + # PHASE 1: Collect messages, group into batches, and create pending documents + # Messages are grouped into batches of SLACK_BATCH_SIZE per channel. + # Each batch becomes a single document with full conversational context. + # All documents are visible in the UI immediately with pending status. # ======================================================================= - messages_to_process = [] # List of dicts with document and message data + batches_to_process = [] # List of dicts with document and batch data new_documents_created = False for channel_obj in channels: @@ -264,40 +351,39 @@ async def index_slack_messages( documents_skipped += 1 continue # Skip if no valid messages after filtering - for msg in formatted_messages: - timestamp = msg.get("datetime", "Unknown Time") - msg_ts = msg.get("ts", timestamp) # Get original Slack timestamp - msg_user_name = msg.get("user_name", "Unknown User") - msg_user_email = msg.get("user_email", "Unknown Email") - msg_text = msg.get("text", "") + total_messages_collected += len(formatted_messages) - # Format document metadata - metadata_sections = [ - ( - "METADATA", - [ - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {timestamp}", - f"MESSAGE_USER_NAME: {msg_user_name}", - f"MESSAGE_USER_EMAIL: {msg_user_email}", - ], - ), - ( - "CONTENT", - ["FORMAT: markdown", "TEXT_START", msg_text, "TEXT_END"], - ), + # ======================================================= + # Group messages into batches of SLACK_BATCH_SIZE + # Each batch becomes a single document with conversation context + # ======================================================= + for batch_start in range( + 0, len(formatted_messages), SLACK_BATCH_SIZE + ): + batch = formatted_messages[ + batch_start : batch_start + SLACK_BATCH_SIZE ] - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections + # Build combined document string from all messages in this batch + combined_document_string = _build_batch_document_string( + team_name=team_name, + team_id=team_id, + channel_name=channel_name, + channel_id=channel_id, + messages=batch, ) - # Generate unique identifier hash for this Slack message - unique_identifier = f"{channel_id}_{msg_ts}" + # Generate unique identifier for this batch using + # channel_id + first message ts + last message ts + first_msg_ts = batch[0].get("timestamp", "") + last_msg_ts = batch[-1].get("timestamp", "") + unique_identifier = ( + f"{channel_id}_{first_msg_ts}_{last_msg_ts}" + ) unique_identifier_hash = generate_unique_identifier_hash( - DocumentType.SLACK_CONNECTOR, unique_identifier, search_space_id + DocumentType.SLACK_CONNECTOR, + unique_identifier, + search_space_id, ) # Generate content hash @@ -306,8 +392,10 @@ async def index_slack_messages( ) # Check if document with this unique identifier already exists - existing_document = await check_document_by_unique_identifier( - session, unique_identifier_hash + existing_document = ( + await check_document_by_unique_identifier( + session, unique_identifier_hash + ) ) if existing_document: @@ -317,26 +405,34 @@ async def index_slack_messages( if not DocumentStatus.is_state( existing_document.status, DocumentStatus.READY ): - existing_document.status = DocumentStatus.ready() - logger.info( - f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping." - ) + existing_document.status = ( + DocumentStatus.ready() + ) documents_skipped += 1 continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( + batches_to_process.append( { "document": existing_document, "is_new": False, "combined_document_string": combined_document_string, "content_hash": content_hash, + "team_name": team_name, + "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "msg_ts": msg_ts, + "first_message_ts": first_msg_ts, + "last_message_ts": last_msg_ts, + "first_message_time": batch[0].get( + "datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, - "message_count": len(formatted_messages), } ) continue @@ -344,28 +440,35 @@ async def index_slack_messages( # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: - duplicate_by_content = await check_duplicate_document_by_hash( - session, content_hash + duplicate_by_content = ( + await check_duplicate_document_by_hash( + session, content_hash + ) ) if duplicate_by_content: logger.info( - f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector " + f"Slack batch ({len(batch)} msgs) in {team_name}#{channel_name} already indexed by another connector " f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, - title=channel_name, + title=f"{team_name}#{channel_name}", document_type=DocumentType.SLACK_CONNECTOR, document_metadata={ + "team_name": team_name, + "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "msg_ts": msg_ts, + "first_message_ts": first_msg_ts, + "last_message_ts": last_msg_ts, + "message_count": len(batch), "connector_id": connector_id, }, content="Pending...", # Placeholder until processed @@ -381,23 +484,33 @@ async def index_slack_messages( session.add(document) new_documents_created = True - messages_to_process.append( + batches_to_process.append( { "document": document, "is_new": True, "combined_document_string": combined_document_string, "content_hash": content_hash, + "team_name": team_name, + "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "msg_ts": msg_ts, + "first_message_ts": first_msg_ts, + "last_message_ts": last_msg_ts, + "first_message_time": batch[0].get( + "datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, - "message_count": len(formatted_messages), } ) logger.info( - f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}" + f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, " + f"grouped into {(len(formatted_messages) + SLACK_BATCH_SIZE - 1) // SLACK_BATCH_SIZE} batch(es)" ) except SlackApiError as slack_error: @@ -416,17 +529,20 @@ async def index_slack_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents " + f"({total_messages_collected} total messages across all channels)" ) await session.commit() # ======================================================================= - # PHASE 2: Process each document one by one + # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + logger.info( + f"Phase 2: Processing {len(batches_to_process)} batch documents" + ) - for item in messages_to_process: + for item in batches_to_process: # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() @@ -447,16 +563,22 @@ async def index_slack_messages( ) # Update document to READY with actual content - document.title = item["channel_name"] + document.title = f"{item['team_name']}#{item['channel_name']}" document.content = item["combined_document_string"] document.content_hash = item["content_hash"] document.embedding = doc_embedding document.document_metadata = { + "team_name": item["team_name"], + "team_id": item["team_id"], "channel_name": item["channel_name"], "channel_id": item["channel_id"], + "first_message_ts": item["first_message_ts"], + "last_message_ts": item["last_message_ts"], + "first_message_time": item["first_message_time"], + "last_message_time": item["last_message_time"], + "message_count": item["message_count"], "start_date": item["start_date"], "end_date": item["end_date"], - "message_count": item["message_count"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), "connector_id": connector_id, } @@ -469,13 +591,13 @@ async def index_slack_messages( # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - f"Committing batch: {documents_indexed} Slack messages processed so far" + f"Committing batch: {documents_indexed} batch documents processed so far" ) await session.commit() except Exception as e: logger.error( - f"Error processing Slack message {item.get('msg_ts', 'Unknown')}: {e!s}", + f"Error processing Slack batch document: {e!s}", exc_info=True, ) # Mark document as failed with reason (visible in UI) @@ -493,10 +615,15 @@ async def index_slack_messages( await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches - logger.info(f"Final commit: Total {documents_indexed} Slack messages processed") + logger.info( + f"Final commit: Total {documents_indexed} batch documents processed " + f"(from {total_messages_collected} messages)" + ) try: await session.commit() - logger.info("Successfully committed all Slack document changes to database") + logger.info( + "Successfully committed all Slack document changes to database" + ) except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( @@ -514,8 +641,12 @@ async def index_slack_messages( # Build warning message if there were issues warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") if documents_failed > 0: warning_parts.append(f"{documents_failed} failed") + if skipped_channels: + warning_parts.append(f"{len(skipped_channels)} channels skipped") warning_message = ", ".join(warning_parts) if warning_parts else None # Log success @@ -527,13 +658,20 @@ async def index_slack_messages( "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), + "total_messages_collected": total_messages_collected, + "batch_size": SLACK_BATCH_SIZE, + "team_id": team_id, + "team_name": team_name, }, ) logger.info( - f"Slack indexing completed: {documents_indexed} ready, " - f"{documents_skipped} skipped, {documents_failed} failed" + f"Slack indexing completed for workspace {team_name}: " + f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), " + f"{documents_skipped} skipped, {documents_failed} failed " + f"({duplicate_content_count} duplicate content)" ) return documents_indexed, warning_message From ac394e78cc152e9b42ae0928900b74bc30adbae7 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 8 Feb 2026 03:33:15 +0530 Subject: [PATCH 4/9] feat: enhance DocumentsTableShell layout and improve document type labeling in DocumentTypeIcon --- .../(manage)/components/DocumentTypeIcon.tsx | 40 +++++++++++++++++-- .../components/DocumentsTableShell.tsx | 18 ++++----- surfsense_web/components/ui/checkbox.tsx | 9 +++-- surfsense_web/lib/connectors/utils.ts | 7 +++- 4 files changed, 56 insertions(+), 18 deletions(-) diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx index 92ddb0057..1a3e343ac 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentTypeIcon.tsx @@ -10,10 +10,42 @@ export function getDocumentTypeIcon(type: string, className?: string): React.Rea } export function getDocumentTypeLabel(type: string): string { - return type - .split("_") - .map((word) => word.charAt(0) + word.slice(1).toLowerCase()) - .join(" "); + const labelMap: Record = { + EXTENSION: "Extension", + CRAWLED_URL: "Web Page", + FILE: "File", + SLACK_CONNECTOR: "Slack", + TEAMS_CONNECTOR: "Microsoft Teams", + NOTION_CONNECTOR: "Notion", + YOUTUBE_VIDEO: "YouTube Video", + GITHUB_CONNECTOR: "GitHub", + LINEAR_CONNECTOR: "Linear", + DISCORD_CONNECTOR: "Discord", + JIRA_CONNECTOR: "Jira", + CONFLUENCE_CONNECTOR: "Confluence", + CLICKUP_CONNECTOR: "ClickUp", + GOOGLE_CALENDAR_CONNECTOR: "Google Calendar", + GOOGLE_GMAIL_CONNECTOR: "Gmail", + GOOGLE_DRIVE_FILE: "Google Drive", + AIRTABLE_CONNECTOR: "Airtable", + LUMA_CONNECTOR: "Luma", + ELASTICSEARCH_CONNECTOR: "Elasticsearch", + BOOKSTACK_CONNECTOR: "BookStack", + CIRCLEBACK: "Circleback", + OBSIDIAN_CONNECTOR: "Obsidian", + SURFSENSE_DOCS: "SurfSense Docs", + NOTE: "Note", + COMPOSIO_GOOGLE_DRIVE_CONNECTOR: "Composio Google Drive", + COMPOSIO_GMAIL_CONNECTOR: "Composio Gmail", + COMPOSIO_GOOGLE_CALENDAR_CONNECTOR: "Composio Google Calendar", + }; + return ( + labelMap[type] || + type + .split("_") + .map((word) => word.charAt(0) + word.slice(1).toLowerCase()) + .join(" ") + ); } export function DocumentTypeChip({ type, className }: { type: string; className?: string }) { diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx index a44295ec0..b8ede1037 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/DocumentsTableShell.tsx @@ -354,14 +354,14 @@ export function DocumentsTableShell({ - + {columnVisibility.document_type && ( - + - )} + )} {columnVisibility.created_by && ( @@ -396,11 +396,11 @@ export function DocumentsTableShell({ - + {columnVisibility.document_type && ( - + )} @@ -499,7 +499,7 @@ export function DocumentsTableShell({ /> - + {columnVisibility.document_type && ( - + - + diff --git a/surfsense_web/components/public-chat/public-chat-view.tsx b/surfsense_web/components/public-chat/public-chat-view.tsx index 08a450d06..2755494f9 100644 --- a/surfsense_web/components/public-chat/public-chat-view.tsx +++ b/surfsense_web/components/public-chat/public-chat-view.tsx @@ -1,8 +1,8 @@ "use client"; import { AssistantRuntimeProvider } from "@assistant-ui/react"; -import { Loader2 } from "lucide-react"; import { Navbar } from "@/components/homepage/navbar"; +import { Spinner } from "@/components/ui/spinner"; import { DisplayImageToolUI } from "@/components/tool-ui/display-image"; import { GeneratePodcastToolUI } from "@/components/tool-ui/generate-podcast"; import { LinkPreviewToolUI } from "@/components/tool-ui/link-preview"; @@ -26,7 +26,7 @@ export function PublicChatView({ shareToken }: PublicChatViewProps) {
- +
); From e2dd80c60413bd1f026e6dbb50c12ed6720128cf Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 8 Feb 2026 12:43:31 +0530 Subject: [PATCH 6/9] chore: ran linting --- .../connector_indexers/discord_indexer.py | 4 +- .../tasks/connector_indexers/slack_indexer.py | 40 ++++--------- .../(manage)/components/RowActions.tsx | 6 +- surfsense_web/components/UserDropdown.tsx | 6 +- .../layout/ui/sidebar/SidebarUserProfile.tsx | 57 ++++++++----------- .../public-chat/public-chat-footer.tsx | 7 ++- 6 files changed, 48 insertions(+), 72 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 98b798452..8769d03c5 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -651,9 +651,7 @@ async def index_discord_messages( # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info( - f"Phase 2: Processing {len(batches_to_process)} batch documents" - ) + logger.info(f"Phase 2: Processing {len(batches_to_process)} batch documents") for item in batches_to_process: # Send heartbeat periodically diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index decdc7b37..01771d2ac 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -357,9 +357,7 @@ async def index_slack_messages( # Group messages into batches of SLACK_BATCH_SIZE # Each batch becomes a single document with conversation context # ======================================================= - for batch_start in range( - 0, len(formatted_messages), SLACK_BATCH_SIZE - ): + for batch_start in range(0, len(formatted_messages), SLACK_BATCH_SIZE): batch = formatted_messages[ batch_start : batch_start + SLACK_BATCH_SIZE ] @@ -377,9 +375,7 @@ async def index_slack_messages( # channel_id + first message ts + last message ts first_msg_ts = batch[0].get("timestamp", "") last_msg_ts = batch[-1].get("timestamp", "") - unique_identifier = ( - f"{channel_id}_{first_msg_ts}_{last_msg_ts}" - ) + unique_identifier = f"{channel_id}_{first_msg_ts}_{last_msg_ts}" unique_identifier_hash = generate_unique_identifier_hash( DocumentType.SLACK_CONNECTOR, unique_identifier, @@ -392,10 +388,8 @@ async def index_slack_messages( ) # Check if document with this unique identifier already exists - existing_document = ( - await check_document_by_unique_identifier( - session, unique_identifier_hash - ) + existing_document = await check_document_by_unique_identifier( + session, unique_identifier_hash ) if existing_document: @@ -405,9 +399,7 @@ async def index_slack_messages( if not DocumentStatus.is_state( existing_document.status, DocumentStatus.READY ): - existing_document.status = ( - DocumentStatus.ready() - ) + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue @@ -440,10 +432,8 @@ async def index_slack_messages( # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: - duplicate_by_content = ( - await check_duplicate_document_by_hash( - session, content_hash - ) + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash ) if duplicate_by_content: @@ -496,12 +486,8 @@ async def index_slack_messages( "channel_id": channel_id, "first_message_ts": first_msg_ts, "last_message_ts": last_msg_ts, - "first_message_time": batch[0].get( - "datetime", "Unknown" - ), - "last_message_time": batch[-1].get( - "datetime", "Unknown" - ), + "first_message_time": batch[0].get("datetime", "Unknown"), + "last_message_time": batch[-1].get("datetime", "Unknown"), "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, @@ -538,9 +524,7 @@ async def index_slack_messages( # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info( - f"Phase 2: Processing {len(batches_to_process)} batch documents" - ) + logger.info(f"Phase 2: Processing {len(batches_to_process)} batch documents") for item in batches_to_process: # Send heartbeat periodically @@ -621,9 +605,7 @@ async def index_slack_messages( ) try: await session.commit() - logger.info( - "Successfully committed all Slack document changes to database" - ) + logger.info("Successfully committed all Slack document changes to database") except Exception as e: # Handle any remaining integrity errors gracefully (race conditions, etc.) if ( diff --git a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx index ffb763c6b..137c02f27 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/documents/(manage)/components/RowActions.tsx @@ -51,8 +51,7 @@ export function RowActions({ document.status?.state === "pending" || document.status?.state === "processing"; // FILE documents that failed processing cannot be edited - const isFileFailed = - document.document_type === "FILE" && document.status?.state === "failed"; + const isFileFailed = document.document_type === "FILE" && document.status?.state === "failed"; // SURFSENSE_DOCS are system-managed and should not show delete at all const shouldShowDelete = !NON_DELETABLE_DOCUMENT_TYPES.includes( @@ -212,7 +211,8 @@ export function RowActions({ Delete document? - This action cannot be undone. This will permanently delete this document from your search space. + This action cannot be undone. This will permanently delete this document from your + search space. diff --git a/surfsense_web/components/UserDropdown.tsx b/surfsense_web/components/UserDropdown.tsx index 8c5bc221e..39896825e 100644 --- a/surfsense_web/components/UserDropdown.tsx +++ b/surfsense_web/components/UserDropdown.tsx @@ -98,9 +98,9 @@ export function UserDropdown({ className="text-xs md:text-sm" disabled={isLoggingOut} > - {isLoggingOut ? ( - - ) : ( + {isLoggingOut ? ( + + ) : ( )} {isLoggingOut ? "Logging out..." : "Log out"} diff --git a/surfsense_web/components/layout/ui/sidebar/SidebarUserProfile.tsx b/surfsense_web/components/layout/ui/sidebar/SidebarUserProfile.tsx index f0db30eac..2ab0b0844 100644 --- a/surfsense_web/components/layout/ui/sidebar/SidebarUserProfile.tsx +++ b/surfsense_web/components/layout/ui/sidebar/SidebarUserProfile.tsx @@ -1,15 +1,6 @@ "use client"; -import { - Check, - ChevronUp, - Languages, - Laptop, - LogOut, - Moon, - Settings, - Sun, -} from "lucide-react"; +import { Check, ChevronUp, Languages, Laptop, LogOut, Moon, Settings, Sun } from "lucide-react"; import { useTranslations } from "next-intl"; import { useState } from "react"; import { @@ -264,18 +255,18 @@ export function SidebarUserProfile({ - - {isLoggingOut ? ( - - ) : ( - - )} - {isLoggingOut ? t("loggingOut") : t("logout")} - - - - - ); + + {isLoggingOut ? ( + + ) : ( + + )} + {isLoggingOut ? t("loggingOut") : t("logout")} + + + + + ); } // Expanded view @@ -386,16 +377,16 @@ export function SidebarUserProfile({ - - {isLoggingOut ? ( - - ) : ( - - )} - {isLoggingOut ? t("loggingOut") : t("logout")} - - - - + + {isLoggingOut ? ( + + ) : ( + + )} + {isLoggingOut ? t("loggingOut") : t("logout")} + + + + ); } diff --git a/surfsense_web/components/public-chat/public-chat-footer.tsx b/surfsense_web/components/public-chat/public-chat-footer.tsx index 39099aaf7..79b317ddf 100644 --- a/surfsense_web/components/public-chat/public-chat-footer.tsx +++ b/surfsense_web/components/public-chat/public-chat-footer.tsx @@ -63,7 +63,12 @@ export function PublicChatFooter({ shareToken }: PublicChatFooterProps) { return (
- From dea66c4eb7d64820580e76429d505fcff4940e8d Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 8 Feb 2026 13:00:33 +0530 Subject: [PATCH 7/9] fix: update connector name display logic in IndexingConfigurationView for better readability --- .../views/indexing-configuration-view.tsx | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index b885f35da..2ab25ae95 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -119,14 +119,16 @@ export const IndexingConfigurationView: FC = ({
-
- - {getConnectorTypeDisplay(connector?.connector_type || "")} Connected ! - {" "} +
+ + {getConnectorTypeDisplay(connector?.connector_type || "")} Connected ! + + {connector?.name?.includes(" - ") && ( - {getConnectorDisplayName(connector?.name || "")} + {getConnectorDisplayName(connector.name)} -
+ )} +

Configure when to start syncing your data

From e053b1b98817cfd1f543374d160e72f377b155c9 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 8 Feb 2026 13:05:36 +0530 Subject: [PATCH 8/9] refactor: improve layout structure in IndexingConfigurationView for enhanced readability --- .../views/indexing-configuration-view.tsx | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx index 2ab25ae95..11760bb30 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/indexing-configuration-view.tsx @@ -119,16 +119,16 @@ export const IndexingConfigurationView: FC = ({
-
- - {getConnectorTypeDisplay(connector?.connector_type || "")} Connected ! - - {connector?.name?.includes(" - ") && ( - - {getConnectorDisplayName(connector.name)} +
+ + {getConnectorTypeDisplay(connector?.connector_type || "")} Connected ! - )} -
+ {connector?.name?.includes(" - ") && ( + + {getConnectorDisplayName(connector.name)} + + )} +

Configure when to start syncing your data

From 20ab128b055460bcc5b61214a07d2cf0c8e8745a Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Mon, 9 Feb 2026 14:31:22 +0530 Subject: [PATCH 9/9] feat: implement batch indexing for Microsoft Teams messages to improve efficiency and conversational context --- .../tasks/connector_indexers/teams_indexer.py | 262 +++++++++++++----- 1 file changed, 196 insertions(+), 66 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 1b13a2c37..cf6828268 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -1,7 +1,10 @@ """ Microsoft Teams connector indexer. -Implements 2-phase document status updates for real-time UI feedback: +Implements batch indexing: groups up to TEAMS_BATCH_SIZE messages per channel +into a single document for efficient indexing and better conversational context. + +Uses 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed """ @@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]] # Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +# Number of messages to combine into a single document for batch indexing. +# Grouping messages improves conversational context in embeddings/chunks and +# drastically reduces the number of documents, embedding calls, and DB overhead. +TEAMS_BATCH_SIZE = 100 + + +def _build_batch_document_string( + team_name: str, + team_id: str, + channel_name: str, + channel_id: str, + messages: list[dict], +) -> str: + """ + Combine multiple Teams messages into a single document string. + + Each message is formatted with its timestamp and author, and all messages + are concatenated into a conversation-style document. The chunker will + later split this into overlapping windows of ~8-10 consecutive messages, + preserving conversational context in each chunk's embedding. + + Args: + team_name: Name of the Microsoft Team + team_id: ID of the Microsoft Team + channel_name: Name of the channel + channel_id: ID of the channel + messages: List of formatted message dicts with 'user_name', 'created_datetime', 'content' + + Returns: + Formatted document string with metadata and conversation content + """ + first_msg_time = messages[0].get("created_datetime", "Unknown") + last_msg_time = messages[-1].get("created_datetime", "Unknown") + + metadata_lines = [ + f"TEAM_NAME: {team_name}", + f"TEAM_ID: {team_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(messages)}", + f"FIRST_MESSAGE_TIME: {first_msg_time}", + f"LAST_MESSAGE_TIME: {last_msg_time}", + ] + + conversation_lines = [] + for msg in messages: + author = msg.get("user_name", "Unknown User") + timestamp = msg.get("created_datetime", "Unknown Time") + content = msg.get("content", "") + conversation_lines.append(f"[{timestamp}] {author}: {content}") + + metadata_sections = [ + ("METADATA", metadata_lines), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + "\n".join(conversation_lines), + "TEXT_END", + ], + ), + ] + + return build_document_metadata_markdown(metadata_sections) + async def index_teams_messages( session: AsyncSession, @@ -55,6 +124,12 @@ async def index_teams_messages( """ Index Microsoft Teams messages from all accessible teams and channels. + Messages are grouped into batches of TEAMS_BATCH_SIZE per channel, + so each document contains up to 100 consecutive messages with full + conversational context. This reduces document count, embedding calls, + and DB overhead by ~100x while improving search quality through + context-aware chunk embeddings. + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed @@ -184,6 +259,7 @@ async def index_teams_messages( documents_skipped = 0 documents_failed = 0 duplicate_content_count = 0 + total_messages_collected = 0 skipped_channels = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -199,21 +275,21 @@ async def index_teams_messages( start_datetime = None end_datetime = None if start_date_str: - # Parse as naive datetime and make it timezone-aware (UTC) start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace( tzinfo=UTC ) if end_date_str: - # Parse as naive datetime, set to end of day, and make it timezone-aware (UTC) end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace( hour=23, minute=59, second=59, tzinfo=UTC ) # ======================================================================= - # PHASE 1: Collect all messages and create pending documents - # This makes ALL documents visible in the UI immediately with pending status + # PHASE 1: Collect messages, group into batches, and create pending documents + # Messages are grouped into batches of TEAMS_BATCH_SIZE per channel. + # Each batch becomes a single document with full conversational context. + # All documents are visible in the UI immediately with pending status. # ======================================================================= - messages_to_process = [] # List of dicts with document and message data + batches_to_process = [] # List of dicts with document and batch data new_documents_created = False for team in teams: @@ -251,65 +327,72 @@ async def index_teams_messages( ) continue - # Process each message + # Format messages for batching + formatted_messages = [] for msg in messages: # Skip deleted messages or empty content if msg.get("deletedDateTime"): continue - # Extract message details - message_id = msg.get("id", "") - created_datetime = msg.get("createdDateTime", "") from_user = msg.get("from", {}) user_name = from_user.get("user", {}).get( "displayName", "Unknown User" ) - user_email = from_user.get("user", {}).get( - "userPrincipalName", "Unknown Email" - ) - # Extract message content body = msg.get("body", {}) - content_type = body.get("contentType", "text") msg_text = body.get("content", "") # Skip empty messages if not msg_text or msg_text.strip() == "": continue - # Format document metadata - metadata_sections = [ - ( - "METADATA", - [ - f"TEAM_NAME: {team_name}", - f"TEAM_ID: {team_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {created_datetime}", - f"MESSAGE_USER_NAME: {user_name}", - f"MESSAGE_USER_EMAIL: {user_email}", - f"CONTENT_TYPE: {content_type}", - ], - ), - ( - "CONTENT", - [ - f"FORMAT: {content_type}", - "TEXT_START", - msg_text, - "TEXT_END", - ], - ), - ] - - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections + formatted_messages.append( + { + "message_id": msg.get("id", ""), + "created_datetime": msg.get("createdDateTime", ""), + "user_name": user_name, + "content": msg_text, + } ) - # Generate unique identifier hash for this Teams message - unique_identifier = f"{team_id}_{channel_id}_{message_id}" + if not formatted_messages: + logger.info( + "No valid messages found in channel %s of team %s after filtering.", + channel_name, + team_name, + ) + documents_skipped += 1 + continue + + total_messages_collected += len(formatted_messages) + + # ======================================================= + # Group messages into batches of TEAMS_BATCH_SIZE + # Each batch becomes a single document with conversation context + # ======================================================= + for batch_start in range( + 0, len(formatted_messages), TEAMS_BATCH_SIZE + ): + batch = formatted_messages[ + batch_start : batch_start + TEAMS_BATCH_SIZE + ] + + # Build combined document string from all messages in this batch + combined_document_string = _build_batch_document_string( + team_name=team_name, + team_id=team_id, + channel_name=channel_name, + channel_id=channel_id, + messages=batch, + ) + + # Generate unique identifier for this batch using + # team_id + channel_id + first message id + last message id + first_msg_id = batch[0].get("message_id", "") + last_msg_id = batch[-1].get("message_id", "") + unique_identifier = ( + f"{team_id}_{channel_id}_{first_msg_id}_{last_msg_id}" + ) unique_identifier_hash = generate_unique_identifier_hash( DocumentType.TEAMS_CONNECTOR, unique_identifier, @@ -331,7 +414,6 @@ async def index_teams_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) if not DocumentStatus.is_state( existing_document.status, DocumentStatus.READY ): @@ -342,7 +424,7 @@ async def index_teams_messages( continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( + batches_to_process.append( { "document": existing_document, "is_new": False, @@ -352,14 +434,21 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": message_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, } ) continue - # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: duplicate_by_content = ( @@ -370,9 +459,10 @@ async def index_teams_messages( if duplicate_by_content: logger.info( - "Teams message %s in channel %s already indexed by another connector " + "Teams batch (%s msgs) in %s/%s already indexed by another connector " "(existing document ID: %s, type: %s). Skipping.", - message_id, + len(batch), + team_name, channel_name, duplicate_by_content.id, duplicate_by_content.document_type, @@ -391,6 +481,9 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "message_count": len(batch), "connector_id": connector_id, }, content="Pending...", # Placeholder until processed @@ -406,7 +499,7 @@ async def index_teams_messages( session.add(document) new_documents_created = True - messages_to_process.append( + batches_to_process.append( { "document": document, "is_new": True, @@ -416,12 +509,30 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": message_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, } ) + logger.info( + "Phase 1: Collected %s messages from %s/%s, " + "grouped into %s batch(es)", + len(formatted_messages), + team_name, + channel_name, + (len(formatted_messages) + TEAMS_BATCH_SIZE - 1) + // TEAMS_BATCH_SIZE, + ) + except Exception as e: logger.error( "Error processing channel %s in team %s: %s", @@ -441,17 +552,20 @@ async def index_teams_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + "Phase 1: Committing %s pending batch documents " + "(%s total messages across all channels)", + len([b for b in batches_to_process if b["is_new"]]), + total_messages_collected, ) await session.commit() # ======================================================================= - # PHASE 2: Process each document one by one + # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + logger.info("Phase 2: Processing %s batch documents", len(batches_to_process)) - for item in messages_to_process: + for item in batches_to_process: # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() @@ -481,6 +595,11 @@ async def index_teams_messages( "team_id": item["team_id"], "channel_name": item["channel_name"], "channel_id": item["channel_id"], + "first_message_id": item["first_message_id"], + "last_message_id": item["last_message_id"], + "first_message_time": item["first_message_time"], + "last_message_time": item["last_message_time"], + "message_count": item["message_count"], "start_date": item["start_date"], "end_date": item["end_date"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), @@ -495,20 +614,25 @@ async def index_teams_messages( # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - "Committing batch: %s Teams messages processed so far", + "Committing batch: %s Teams batch documents processed so far", documents_indexed, ) await session.commit() except Exception as e: - logger.error(f"Error processing Teams message: {e!s}", exc_info=True) + logger.error( + "Error processing Teams batch document: %s", + str(e), + exc_info=True, + ) # Mark document as failed with reason (visible in UI) try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: logger.error( - f"Failed to update document status to failed: {status_error}" + "Failed to update document status to failed: %s", + str(status_error), ) documents_failed += 1 continue @@ -518,7 +642,9 @@ async def index_teams_messages( # Final commit for any remaining documents not yet committed in batches logger.info( - "Final commit: Total %s Teams messages processed", documents_indexed + "Final commit: Total %s Teams batch documents processed (from %s messages)", + documents_indexed, + total_messages_collected, ) try: await session.commit() @@ -530,8 +656,9 @@ async def index_teams_messages( or "uniqueviolationerror" in str(e).lower() ): logger.warning( - f"Duplicate content_hash detected during final commit. " - f"Rolling back and continuing. Error: {e!s}" + "Duplicate content_hash detected during final commit. " + "Rolling back and continuing. Error: %s", + str(e), ) await session.rollback() else: @@ -557,13 +684,16 @@ async def index_teams_messages( "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), + "total_messages_collected": total_messages_collected, + "batch_size": TEAMS_BATCH_SIZE, }, ) logger.info( - "Teams indexing completed: %s ready, %s skipped, %s failed " - "(%s duplicate content)", + "Teams indexing completed: %s batch docs ready (from %s messages), " + "%s skipped, %s failed (%s duplicate content)", documents_indexed, + total_messages_collected, documents_skipped, documents_failed, duplicate_content_count,