diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 1b13a2c37..cf6828268 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -1,7 +1,10 @@ """ Microsoft Teams connector indexer. -Implements 2-phase document status updates for real-time UI feedback: +Implements batch indexing: groups up to TEAMS_BATCH_SIZE messages per channel +into a single document for efficient indexing and better conversational context. + +Uses 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed """ @@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]] # Heartbeat interval in seconds - update notification every 30 seconds HEARTBEAT_INTERVAL_SECONDS = 30 +# Number of messages to combine into a single document for batch indexing. +# Grouping messages improves conversational context in embeddings/chunks and +# drastically reduces the number of documents, embedding calls, and DB overhead. +TEAMS_BATCH_SIZE = 100 + + +def _build_batch_document_string( + team_name: str, + team_id: str, + channel_name: str, + channel_id: str, + messages: list[dict], +) -> str: + """ + Combine multiple Teams messages into a single document string. + + Each message is formatted with its timestamp and author, and all messages + are concatenated into a conversation-style document. The chunker will + later split this into overlapping windows of ~8-10 consecutive messages, + preserving conversational context in each chunk's embedding. + + Args: + team_name: Name of the Microsoft Team + team_id: ID of the Microsoft Team + channel_name: Name of the channel + channel_id: ID of the channel + messages: List of formatted message dicts with 'user_name', 'created_datetime', 'content' + + Returns: + Formatted document string with metadata and conversation content + """ + first_msg_time = messages[0].get("created_datetime", "Unknown") + last_msg_time = messages[-1].get("created_datetime", "Unknown") + + metadata_lines = [ + f"TEAM_NAME: {team_name}", + f"TEAM_ID: {team_id}", + f"CHANNEL_NAME: {channel_name}", + f"CHANNEL_ID: {channel_id}", + f"MESSAGE_COUNT: {len(messages)}", + f"FIRST_MESSAGE_TIME: {first_msg_time}", + f"LAST_MESSAGE_TIME: {last_msg_time}", + ] + + conversation_lines = [] + for msg in messages: + author = msg.get("user_name", "Unknown User") + timestamp = msg.get("created_datetime", "Unknown Time") + content = msg.get("content", "") + conversation_lines.append(f"[{timestamp}] {author}: {content}") + + metadata_sections = [ + ("METADATA", metadata_lines), + ( + "CONTENT", + [ + "FORMAT: markdown", + "TEXT_START", + "\n".join(conversation_lines), + "TEXT_END", + ], + ), + ] + + return build_document_metadata_markdown(metadata_sections) + async def index_teams_messages( session: AsyncSession, @@ -55,6 +124,12 @@ async def index_teams_messages( """ Index Microsoft Teams messages from all accessible teams and channels. + Messages are grouped into batches of TEAMS_BATCH_SIZE per channel, + so each document contains up to 100 consecutive messages with full + conversational context. This reduces document count, embedding calls, + and DB overhead by ~100x while improving search quality through + context-aware chunk embeddings. + Implements 2-phase document status updates for real-time UI feedback: - Phase 1: Create all documents with 'pending' status (visible in UI immediately) - Phase 2: Process each document: pending → processing → ready/failed @@ -184,6 +259,7 @@ async def index_teams_messages( documents_skipped = 0 documents_failed = 0 duplicate_content_count = 0 + total_messages_collected = 0 skipped_channels = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -199,21 +275,21 @@ async def index_teams_messages( start_datetime = None end_datetime = None if start_date_str: - # Parse as naive datetime and make it timezone-aware (UTC) start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace( tzinfo=UTC ) if end_date_str: - # Parse as naive datetime, set to end of day, and make it timezone-aware (UTC) end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace( hour=23, minute=59, second=59, tzinfo=UTC ) # ======================================================================= - # PHASE 1: Collect all messages and create pending documents - # This makes ALL documents visible in the UI immediately with pending status + # PHASE 1: Collect messages, group into batches, and create pending documents + # Messages are grouped into batches of TEAMS_BATCH_SIZE per channel. + # Each batch becomes a single document with full conversational context. + # All documents are visible in the UI immediately with pending status. # ======================================================================= - messages_to_process = [] # List of dicts with document and message data + batches_to_process = [] # List of dicts with document and batch data new_documents_created = False for team in teams: @@ -251,65 +327,72 @@ async def index_teams_messages( ) continue - # Process each message + # Format messages for batching + formatted_messages = [] for msg in messages: # Skip deleted messages or empty content if msg.get("deletedDateTime"): continue - # Extract message details - message_id = msg.get("id", "") - created_datetime = msg.get("createdDateTime", "") from_user = msg.get("from", {}) user_name = from_user.get("user", {}).get( "displayName", "Unknown User" ) - user_email = from_user.get("user", {}).get( - "userPrincipalName", "Unknown Email" - ) - # Extract message content body = msg.get("body", {}) - content_type = body.get("contentType", "text") msg_text = body.get("content", "") # Skip empty messages if not msg_text or msg_text.strip() == "": continue - # Format document metadata - metadata_sections = [ - ( - "METADATA", - [ - f"TEAM_NAME: {team_name}", - f"TEAM_ID: {team_id}", - f"CHANNEL_NAME: {channel_name}", - f"CHANNEL_ID: {channel_id}", - f"MESSAGE_TIMESTAMP: {created_datetime}", - f"MESSAGE_USER_NAME: {user_name}", - f"MESSAGE_USER_EMAIL: {user_email}", - f"CONTENT_TYPE: {content_type}", - ], - ), - ( - "CONTENT", - [ - f"FORMAT: {content_type}", - "TEXT_START", - msg_text, - "TEXT_END", - ], - ), - ] - - # Build the document string - combined_document_string = build_document_metadata_markdown( - metadata_sections + formatted_messages.append( + { + "message_id": msg.get("id", ""), + "created_datetime": msg.get("createdDateTime", ""), + "user_name": user_name, + "content": msg_text, + } ) - # Generate unique identifier hash for this Teams message - unique_identifier = f"{team_id}_{channel_id}_{message_id}" + if not formatted_messages: + logger.info( + "No valid messages found in channel %s of team %s after filtering.", + channel_name, + team_name, + ) + documents_skipped += 1 + continue + + total_messages_collected += len(formatted_messages) + + # ======================================================= + # Group messages into batches of TEAMS_BATCH_SIZE + # Each batch becomes a single document with conversation context + # ======================================================= + for batch_start in range( + 0, len(formatted_messages), TEAMS_BATCH_SIZE + ): + batch = formatted_messages[ + batch_start : batch_start + TEAMS_BATCH_SIZE + ] + + # Build combined document string from all messages in this batch + combined_document_string = _build_batch_document_string( + team_name=team_name, + team_id=team_id, + channel_name=channel_name, + channel_id=channel_id, + messages=batch, + ) + + # Generate unique identifier for this batch using + # team_id + channel_id + first message id + last message id + first_msg_id = batch[0].get("message_id", "") + last_msg_id = batch[-1].get("message_id", "") + unique_identifier = ( + f"{team_id}_{channel_id}_{first_msg_id}_{last_msg_id}" + ) unique_identifier_hash = generate_unique_identifier_hash( DocumentType.TEAMS_CONNECTOR, unique_identifier, @@ -331,7 +414,6 @@ async def index_teams_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - # Ensure status is ready (might have been stuck in processing/pending) if not DocumentStatus.is_state( existing_document.status, DocumentStatus.READY ): @@ -342,7 +424,7 @@ async def index_teams_messages( continue # Queue existing document for update (will be set to processing in Phase 2) - messages_to_process.append( + batches_to_process.append( { "document": existing_document, "is_new": False, @@ -352,14 +434,21 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": message_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, } ) continue - # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) with session.no_autoflush: duplicate_by_content = ( @@ -370,9 +459,10 @@ async def index_teams_messages( if duplicate_by_content: logger.info( - "Teams message %s in channel %s already indexed by another connector " + "Teams batch (%s msgs) in %s/%s already indexed by another connector " "(existing document ID: %s, type: %s). Skipping.", - message_id, + len(batch), + team_name, channel_name, duplicate_by_content.id, duplicate_by_content.document_type, @@ -391,6 +481,9 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "message_count": len(batch), "connector_id": connector_id, }, content="Pending...", # Placeholder until processed @@ -406,7 +499,7 @@ async def index_teams_messages( session.add(document) new_documents_created = True - messages_to_process.append( + batches_to_process.append( { "document": document, "is_new": True, @@ -416,12 +509,30 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "message_id": message_id, + "first_message_id": first_msg_id, + "last_message_id": last_msg_id, + "first_message_time": batch[0].get( + "created_datetime", "Unknown" + ), + "last_message_time": batch[-1].get( + "created_datetime", "Unknown" + ), + "message_count": len(batch), "start_date": start_date_str, "end_date": end_date_str, } ) + logger.info( + "Phase 1: Collected %s messages from %s/%s, " + "grouped into %s batch(es)", + len(formatted_messages), + team_name, + channel_name, + (len(formatted_messages) + TEAMS_BATCH_SIZE - 1) + // TEAMS_BATCH_SIZE, + ) + except Exception as e: logger.error( "Error processing channel %s in team %s: %s", @@ -441,17 +552,20 @@ async def index_teams_messages( # Commit all pending documents - they all appear in UI now if new_documents_created: logger.info( - f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents" + "Phase 1: Committing %s pending batch documents " + "(%s total messages across all channels)", + len([b for b in batches_to_process if b["is_new"]]), + total_messages_collected, ) await session.commit() # ======================================================================= - # PHASE 2: Process each document one by one + # PHASE 2: Process each batch document one by one # Each document transitions: pending → processing → ready/failed # ======================================================================= - logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + logger.info("Phase 2: Processing %s batch documents", len(batches_to_process)) - for item in messages_to_process: + for item in batches_to_process: # Send heartbeat periodically if on_heartbeat_callback: current_time = time.time() @@ -481,6 +595,11 @@ async def index_teams_messages( "team_id": item["team_id"], "channel_name": item["channel_name"], "channel_id": item["channel_id"], + "first_message_id": item["first_message_id"], + "last_message_id": item["last_message_id"], + "first_message_time": item["first_message_time"], + "last_message_time": item["last_message_time"], + "message_count": item["message_count"], "start_date": item["start_date"], "end_date": item["end_date"], "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), @@ -495,20 +614,25 @@ async def index_teams_messages( # Batch commit every 10 documents (for ready status updates) if documents_indexed % 10 == 0: logger.info( - "Committing batch: %s Teams messages processed so far", + "Committing batch: %s Teams batch documents processed so far", documents_indexed, ) await session.commit() except Exception as e: - logger.error(f"Error processing Teams message: {e!s}", exc_info=True) + logger.error( + "Error processing Teams batch document: %s", + str(e), + exc_info=True, + ) # Mark document as failed with reason (visible in UI) try: document.status = DocumentStatus.failed(str(e)) document.updated_at = get_current_timestamp() except Exception as status_error: logger.error( - f"Failed to update document status to failed: {status_error}" + "Failed to update document status to failed: %s", + str(status_error), ) documents_failed += 1 continue @@ -518,7 +642,9 @@ async def index_teams_messages( # Final commit for any remaining documents not yet committed in batches logger.info( - "Final commit: Total %s Teams messages processed", documents_indexed + "Final commit: Total %s Teams batch documents processed (from %s messages)", + documents_indexed, + total_messages_collected, ) try: await session.commit() @@ -530,8 +656,9 @@ async def index_teams_messages( or "uniqueviolationerror" in str(e).lower() ): logger.warning( - f"Duplicate content_hash detected during final commit. " - f"Rolling back and continuing. Error: {e!s}" + "Duplicate content_hash detected during final commit. " + "Rolling back and continuing. Error: %s", + str(e), ) await session.rollback() else: @@ -557,13 +684,16 @@ async def index_teams_messages( "documents_failed": documents_failed, "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), + "total_messages_collected": total_messages_collected, + "batch_size": TEAMS_BATCH_SIZE, }, ) logger.info( - "Teams indexing completed: %s ready, %s skipped, %s failed " - "(%s duplicate content)", + "Teams indexing completed: %s batch docs ready (from %s messages), " + "%s skipped, %s failed (%s duplicate content)", documents_indexed, + total_messages_collected, documents_skipped, documents_failed, duplicate_content_count,