From 0249ea20a5df4bdfdf890a2de66bac2f8a33a316 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 6 Feb 2026 03:42:03 +0530 Subject: [PATCH] feat: implement two-phase document indexing for Discord and Teams connectors with real-time status updates --- .../connector_indexers/discord_indexer.py | 384 +++++++++++------- .../tasks/connector_indexers/teams_indexer.py | 291 +++++++------ 2 files changed, 400 insertions(+), 275 deletions(-) diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index f9a6918a7..e5f333531 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -1,5 +1,9 @@ """ Discord connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import asyncio @@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.discord_connector import DiscordConnector -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, @@ -27,6 +31,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -48,7 +53,11 @@ async def index_discord_messages( on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ - Index Discord messages from all accessible channels. + Index Discord messages from the configured guild's channels. + + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Create all documents with 'pending' status (visible in UI immediately) + - Phase 2: Process each document: pending → processing → ready/failed Args: session: Database session @@ -113,6 +122,37 @@ async def index_discord_messages( logger.info(f"Starting Discord indexing for connector {connector_id}") + # ======================================================================= + # GUILD FILTERING: Only index the specific guild configured for this connector + # ======================================================================= + # Extract guild_id from connector config (set during OAuth flow) + configured_guild_id = connector.config.get("guild_id") + configured_guild_name = connector.config.get("guild_name") + + # Legacy connector check - if no guild_id, we need to warn and handle gracefully + is_legacy_connector = configured_guild_id is None + + if is_legacy_connector: + logger.warning( + f"Discord connector {connector_id} has no guild_id configured. " + "This is a legacy connector. Please reconnect the Discord server to fix this. " + "For now, indexing will be skipped to prevent indexing unwanted servers." + ) + await task_logger.log_task_failure( + log_entry, + f"Legacy Discord connector {connector_id} missing guild_id", + "No guild_id configured. Please reconnect this Discord server.", + {"error_type": "MissingGuildId", "is_legacy": True}, + ) + return ( + 0, + "This Discord connector needs to be reconnected. Please disconnect and reconnect your Discord server to enable indexing.", + ) + + logger.info( + f"Configured to index guild: {configured_guild_name} ({configured_guild_id})" + ) + # Initialize Discord client with OAuth credentials support await task_logger.log_task_progress( log_entry, @@ -255,77 +295,68 @@ async def index_discord_messages( try: await task_logger.log_task_progress( log_entry, - f"Starting Discord bot and fetching guilds for connector {connector_id}", - {"stage": "fetch_guilds"}, + f"Starting Discord bot for connector {connector_id}", + {"stage": "bot_initialization"}, ) - logger.info("Starting Discord bot to fetch guilds") + logger.info("Starting Discord bot") discord_client._bot_task = asyncio.create_task(discord_client.start_bot()) await discord_client._wait_until_ready() - logger.info("Fetching Discord guilds") - guilds = await discord_client.get_guilds() - logger.info(f"Found {len(guilds)} guilds") + # We only process the configured guild, not all guilds + logger.info( + f"Processing configured guild only: {configured_guild_name} ({configured_guild_id})" + ) + except Exception as e: await task_logger.log_task_failure( log_entry, - f"Failed to get Discord guilds for connector {connector_id}", + f"Failed to start Discord bot for connector {connector_id}", str(e), - {"error_type": "GuildFetchError"}, + {"error_type": "BotStartError"}, ) - logger.error(f"Failed to get Discord guilds: {e!s}", exc_info=True) + logger.error(f"Failed to start Discord bot: {e!s}", exc_info=True) await discord_client.close_bot() - return 0, f"Failed to get Discord guilds: {e!s}" - - if not guilds: - await task_logger.log_task_success( - log_entry, - f"No Discord guilds found for connector {connector_id}", - {"guilds_found": 0}, - ) - logger.info("No Discord guilds found to index") - await discord_client.close_bot() - return 0, "No Discord guilds found" + return 0, f"Failed to start Discord bot: {e!s}" # Track results documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 skipped_channels: list[str] = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck last_heartbeat_time = time.time() - # Process each guild and channel + # Use the configured guild info + guild_id = configured_guild_id + guild_name = configured_guild_name or "Unknown Guild" + await task_logger.log_task_progress( log_entry, - f"Starting to process {len(guilds)} Discord guilds", - {"stage": "process_guilds", "total_guilds": len(guilds)}, + f"Processing Discord guild: {guild_name}", + {"stage": "process_guild", "guild_id": guild_id, "guild_name": guild_name}, ) + # ======================================================================= + # PHASE 1: Collect all messages and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + messages_to_process = [] # List of dicts with document and message data + new_documents_created = False + try: - for guild in guilds: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) - >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() - guild_id = guild["id"] - guild_name = guild["name"] - logger.info(f"Processing guild: {guild_name} ({guild_id})") - - try: - channels = await discord_client.get_text_channels(guild_id) - if not channels: - logger.info( - f"No channels found in guild {guild_name}. Skipping." - ) - skipped_channels.append(f"{guild_name} (no channels)") - documents_skipped += 1 - continue + logger.info(f"Processing guild: {guild_name} ({guild_id})") + try: + channels = await discord_client.get_text_channels(guild_id) + if not channels: + logger.info( + f"No channels found in guild {guild_name}. Skipping." + ) + skipped_channels.append(f"{guild_name} (no channels)") + else: for channel in channels: channel_id = channel["id"] channel_name = channel["name"] @@ -343,14 +374,12 @@ async def index_discord_messages( skipped_channels.append( f"{guild_name}#{channel_name} (fetch error)" ) - documents_skipped += 1 continue if not messages: logger.info( f"No messages found in channel {channel_name} for the specified date range." ) - documents_skipped += 1 continue # Filter/format messages @@ -365,7 +394,6 @@ async def index_discord_messages( logger.info( f"No valid messages found in channel {channel_name} after filtering." ) - documents_skipped += 1 continue # Process each message as an individual document (like Slack) @@ -427,55 +455,27 @@ async def index_discord_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - f"Document for Discord message {msg_id} in {guild_name}#{channel_name} unchanged. Skipping." - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - f"Content changed for Discord message {msg_id} in {guild_name}#{channel_name}. Updating document." - ) - # Update chunks and embedding - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = ( - config.embedding_model_instance.embed( - combined_document_string - ) - ) - - # Update existing document - existing_document.content = combined_document_string - existing_document.content_hash = content_hash - existing_document.embedding = doc_embedding - existing_document.document_metadata = { - "guild_name": guild_name, - "guild_id": guild_id, - "channel_name": channel_name, - "channel_id": channel_id, - "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, - "indexed_at": datetime.now(UTC).strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - - # Delete old chunks and add new ones - existing_document.chunks = chunks - existing_document.updated_at = ( - get_current_timestamp() - ) - - documents_indexed += 1 - logger.info( - f"Successfully updated Discord message {msg_id}" - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + messages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'guild_name': guild_name, + 'guild_id': guild_id, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'message_id': msg_id, + 'message_timestamp': msg_timestamp, + 'message_user_name': msg_user_name, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -492,19 +492,11 @@ async def index_discord_messages( f"(existing document ID: {duplicate_by_content.id}, " f"type: {duplicate_by_content.document_type}). Skipping." ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Process chunks - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Create and store new document + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=f"{guild_name}#{channel_name}", @@ -515,87 +507,171 @@ async def index_discord_messages( "channel_name": channel_name, "channel_id": channel_id, "message_id": msg_id, - "message_timestamp": msg_timestamp, - "message_user_name": msg_user_name, - "indexed_at": datetime.now(UTC).strftime( - "%Y-%m-%d %H:%M:%S" - ), + "connector_id": connector_id, }, - content=combined_document_string, - embedding=doc_embedding, - chunks=chunks, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 + new_documents_created = True - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - f"Committing batch: {documents_indexed} Discord messages processed so far" - ) - await session.commit() + messages_to_process.append({ + 'document': document, + 'is_new': True, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'guild_name': guild_name, + 'guild_id': guild_id, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'message_id': msg_id, + 'message_timestamp': msg_timestamp, + 'message_user_name': msg_user_name, + }) - logger.info( - f"Successfully indexed channel {guild_name}#{channel_name} with {len(formatted_messages)} messages" - ) + except Exception as e: + logger.error( + f"Error processing guild {guild_name}: {e!s}", exc_info=True + ) + skipped_channels.append(f"{guild_name} (processing error)") - except Exception as e: - logger.error( - f"Error processing guild {guild_name}: {e!s}", exc_info=True - ) - skipped_channels.append(f"{guild_name} (processing error)") - documents_skipped += 1 - continue finally: await discord_client.close_bot() - # Update last_indexed_at only if we indexed at least one - if documents_indexed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + + for item in messages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (embeddings, chunks) + chunks = await create_document_chunks(item['combined_document_string']) + doc_embedding = config.embedding_model_instance.embed( + item['combined_document_string'] + ) + + # Update document to READY with actual content + document.title = f"{item['guild_name']}#{item['channel_name']}" + document.content = item['combined_document_string'] + document.content_hash = item['content_hash'] + document.embedding = doc_embedding + document.document_metadata = { + "guild_name": item['guild_name'], + "guild_id": item['guild_id'], + "channel_name": item['channel_name'], + "channel_id": item['channel_id'], + "message_id": item['message_id'], + "message_timestamp": item['message_timestamp'], + "message_user_name": item['message_user_name'], + "indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: + logger.info( + f"Committing batch: {documents_indexed} Discord messages processed so far" + ) + await session.commit() + + except Exception as e: + logger.error(f"Error processing Discord message: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info( f"Final commit: Total {documents_indexed} Discord messages processed" ) - await session.commit() - - # Prepare result message - result_message = None - if skipped_channels: - result_message = ( - f"Processed {documents_indexed} messages. Skipped {len(skipped_channels)} channels: " - + ", ".join(skipped_channels) + try: + await session.commit() + logger.info( + "Successfully committed all Discord document changes to database" ) - else: - result_message = f"Processed {documents_indexed} messages." + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise + + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") + if skipped_channels: + warning_parts.append(f"{len(skipped_channels)} channels skipped") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Discord indexing for connector {connector_id}", { - "messages_processed": documents_indexed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), - "guilds_processed": len(guilds), - "result_message": result_message, + "guild_id": guild_id, + "guild_name": guild_name, }, ) logger.info( - f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped" + f"Discord indexing completed for guild {guild_name}: {documents_indexed} ready, {documents_skipped} skipped, " + f"{documents_failed} failed ({duplicate_content_count} duplicate content)" ) - return ( - documents_indexed, - None, - ) # Return None on success (result_message is for logging only) + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index d42c5b7f1..27259fd6f 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -1,17 +1,21 @@ """ Microsoft Teams connector indexer. + +Implements 2-phase document status updates for real-time UI feedback: +- Phase 1: Create all documents with 'pending' status (visible in UI immediately) +- Phase 2: Process each document: pending → processing → ready/failed """ import time from collections.abc import Awaitable, Callable -from datetime import UTC +from datetime import UTC, datetime from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from app.config import config from app.connectors.teams_history import TeamsHistory -from app.db import Document, DocumentType, SearchSourceConnectorType +from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType from app.services.task_logging_service import TaskLoggingService from app.utils.document_converters import ( create_document_chunks, @@ -27,6 +31,7 @@ from .base import ( get_connector_by_id, get_current_timestamp, logger, + safe_set_chunks, update_connector_last_indexed, ) @@ -50,6 +55,10 @@ async def index_teams_messages( """ Index Microsoft Teams messages from all accessible teams and channels. + Implements 2-phase document status updates for real-time UI feedback: + - Phase 1: Create all documents with 'pending' status (visible in UI immediately) + - Phase 2: Process each document: pending → processing → ready/failed + Args: session: Database session connector_id: ID of the Teams connector @@ -165,11 +174,16 @@ async def index_teams_messages( f"No Teams found for connector {connector_id}", {"teams_found": 0}, ) - return 0, "No Teams found" + # CRITICAL: Update timestamp even when no teams found so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) + await session.commit() + return 0, None # Return None (not error) when no items found # Track the number of documents indexed documents_indexed = 0 documents_skipped = 0 + documents_failed = 0 + duplicate_content_count = 0 skipped_channels = [] # Heartbeat tracking - update notification periodically to prevent appearing stuck @@ -182,8 +196,6 @@ async def index_teams_messages( ) # Convert date strings to datetime objects for filtering - from datetime import datetime - start_datetime = None end_datetime = None if start_date_str: @@ -197,16 +209,14 @@ async def index_teams_messages( hour=23, minute=59, second=59, tzinfo=UTC ) - # Process each team - for team in teams: - # Check if it's time for a heartbeat update - if ( - on_heartbeat_callback - and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS - ): - await on_heartbeat_callback(documents_indexed) - last_heartbeat_time = time.time() + # ======================================================================= + # PHASE 1: Collect all messages and create pending documents + # This makes ALL documents visible in the UI immediately with pending status + # ======================================================================= + messages_to_process = [] # List of dicts with document and message data + new_documents_created = False + for team in teams: team_id = team.get("id") team_name = team.get("displayName", "Unknown Team") @@ -239,7 +249,6 @@ async def index_teams_messages( channel_name, team_name, ) - documents_skipped += 1 continue # Process each message @@ -322,60 +331,27 @@ async def index_teams_messages( if existing_document: # Document exists - check if content has changed if existing_document.content_hash == content_hash: - logger.info( - "Document for Teams message %s in channel %s unchanged. Skipping.", - message_id, - channel_name, - ) + # Ensure status is ready (might have been stuck in processing/pending) + if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY): + existing_document.status = DocumentStatus.ready() documents_skipped += 1 continue - else: - # Content has changed - update the existing document - logger.info( - "Content changed for Teams message %s in channel %s. Updating document.", - message_id, - channel_name, - ) - # Update chunks and embedding - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = ( - config.embedding_model_instance.embed( - combined_document_string - ) - ) - - # Update existing document - existing_document.content = combined_document_string - existing_document.content_hash = content_hash - existing_document.embedding = doc_embedding - existing_document.document_metadata = { - "team_name": team_name, - "team_id": team_id, - "channel_name": channel_name, - "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(messages), - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), - } - - # Delete old chunks and add new ones - existing_document.chunks = chunks - existing_document.updated_at = ( - get_current_timestamp() - ) - - documents_indexed += 1 - logger.info( - "Successfully updated Teams message %s", - message_id, - ) - continue + # Queue existing document for update (will be set to processing in Phase 2) + messages_to_process.append({ + 'document': existing_document, + 'is_new': False, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'team_name': team_name, + 'team_id': team_id, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'message_id': message_id, + 'start_date': start_date_str, + 'end_date': end_date_str, + }) + continue # Document doesn't exist by unique_identifier_hash # Check if a document with the same content_hash exists (from another connector) @@ -395,19 +371,11 @@ async def index_teams_messages( duplicate_by_content.id, duplicate_by_content.document_type, ) + duplicate_content_count += 1 documents_skipped += 1 continue - # Document doesn't exist - create new one - # Process chunks - chunks = await create_document_chunks( - combined_document_string - ) - doc_embedding = config.embedding_model_instance.embed( - combined_document_string - ) - - # Create and store new document + # Create new document with PENDING status (visible in UI immediately) document = Document( search_space_id=search_space_id, title=f"{team_name} - {channel_name}", @@ -417,40 +385,34 @@ async def index_teams_messages( "team_id": team_id, "channel_name": channel_name, "channel_id": channel_id, - "start_date": start_date_str, - "end_date": end_date_str, - "message_count": len(messages), - "indexed_at": datetime.now().strftime( - "%Y-%m-%d %H:%M:%S" - ), + "connector_id": connector_id, }, - content=combined_document_string, - embedding=doc_embedding, - chunks=chunks, - content_hash=content_hash, + content="Pending...", # Placeholder until processed + content_hash=unique_identifier_hash, # Temporary unique value - updated when ready unique_identifier_hash=unique_identifier_hash, + embedding=None, + chunks=[], # Empty at creation - safe for async + status=DocumentStatus.pending(), # Pending until processing starts updated_at=get_current_timestamp(), created_by_id=user_id, connector_id=connector_id, ) - session.add(document) - documents_indexed += 1 + new_documents_created = True - # Batch commit every 10 documents - if documents_indexed % 10 == 0: - logger.info( - "Committing batch: %s Teams messages processed so far", - documents_indexed, - ) - await session.commit() - - logger.info( - "Successfully indexed channel %s in team %s with %s messages", - channel_name, - team_name, - len(messages), - ) + messages_to_process.append({ + 'document': document, + 'is_new': True, + 'combined_document_string': combined_document_string, + 'content_hash': content_hash, + 'team_name': team_name, + 'team_id': team_id, + 'channel_name': channel_name, + 'channel_id': channel_id, + 'message_id': message_id, + 'start_date': start_date_str, + 'end_date': end_date_str, + }) except Exception as e: logger.error( @@ -462,54 +424,141 @@ async def index_teams_messages( skipped_channels.append( f"{team_name}/{channel_name} (processing error)" ) - documents_skipped += 1 continue except Exception as e: logger.error("Error processing team %s: %s", team_name, str(e)) continue - # Update the last_indexed_at timestamp for the connector only if requested - # and if we successfully indexed at least one document - total_processed = documents_indexed - if total_processed > 0: - await update_connector_last_indexed(session, connector, update_last_indexed) + # Commit all pending documents - they all appear in UI now + if new_documents_created: + logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents") + await session.commit() + + # ======================================================================= + # PHASE 2: Process each document one by one + # Each document transitions: pending → processing → ready/failed + # ======================================================================= + logger.info(f"Phase 2: Processing {len(messages_to_process)} documents") + + for item in messages_to_process: + # Send heartbeat periodically + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + + document = item['document'] + try: + # Set to PROCESSING and commit - shows "processing" in UI for THIS document only + document.status = DocumentStatus.processing() + await session.commit() + + # Heavy processing (embeddings, chunks) + chunks = await create_document_chunks(item['combined_document_string']) + doc_embedding = config.embedding_model_instance.embed( + item['combined_document_string'] + ) + + # Update document to READY with actual content + document.title = f"{item['team_name']} - {item['channel_name']}" + document.content = item['combined_document_string'] + document.content_hash = item['content_hash'] + document.embedding = doc_embedding + document.document_metadata = { + "team_name": item['team_name'], + "team_id": item['team_id'], + "channel_name": item['channel_name'], + "channel_id": item['channel_id'], + "start_date": item['start_date'], + "end_date": item['end_date'], + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "connector_id": connector_id, + } + safe_set_chunks(document, chunks) + document.updated_at = get_current_timestamp() + document.status = DocumentStatus.ready() + + documents_indexed += 1 + + # Batch commit every 10 documents (for ready status updates) + if documents_indexed % 10 == 0: + logger.info( + "Committing batch: %s Teams messages processed so far", + documents_indexed, + ) + await session.commit() + + except Exception as e: + logger.error(f"Error processing Teams message: {e!s}", exc_info=True) + # Mark document as failed with reason (visible in UI) + try: + document.status = DocumentStatus.failed(str(e)) + document.updated_at = get_current_timestamp() + except Exception as status_error: + logger.error(f"Failed to update document status to failed: {status_error}") + documents_failed += 1 + continue + + # CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs + await update_connector_last_indexed(session, connector, update_last_indexed) # Final commit for any remaining documents not yet committed in batches logger.info( "Final commit: Total %s Teams messages processed", documents_indexed ) - await session.commit() + try: + await session.commit() + logger.info( + "Successfully committed all Teams document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if ( + "duplicate key value violates unique constraint" in str(e).lower() + or "uniqueviolationerror" in str(e).lower() + ): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + else: + raise - # Prepare result message - result_message = None + # Build warning message if there were issues + warning_parts = [] + if duplicate_content_count > 0: + warning_parts.append(f"{duplicate_content_count} duplicate") + if documents_failed > 0: + warning_parts.append(f"{documents_failed} failed") if skipped_channels: - result_message = f"Processed {total_processed} messages. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" - else: - result_message = f"Processed {total_processed} messages." + warning_parts.append(f"{len(skipped_channels)} channels skipped") + warning_message = ", ".join(warning_parts) if warning_parts else None # Log success await task_logger.log_task_success( log_entry, f"Successfully completed Teams indexing for connector {connector_id}", { - "messages_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "documents_failed": documents_failed, + "duplicate_content_count": duplicate_content_count, "skipped_channels_count": len(skipped_channels), - "result_message": result_message, }, ) logger.info( - "Teams indexing completed: %s new messages, %s skipped", + "Teams indexing completed: %s ready, %s skipped, %s failed " + "(%s duplicate content)", documents_indexed, documents_skipped, + documents_failed, + duplicate_content_count, ) - return ( - total_processed, - None, - ) # Return None on success (result_message is for logging only) + return documents_indexed, warning_message except SQLAlchemyError as db_error: await session.rollback()