feat: implement two-phase document indexing for Discord and Teams connectors with real-time status updates

This commit is contained in:
Anish Sarkar 2026-02-06 03:42:03 +05:30
parent 2077344934
commit 0249ea20a5
2 changed files with 400 additions and 275 deletions

View file

@ -1,5 +1,9 @@
""" """
Discord connector indexer. Discord connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
""" """
import asyncio import asyncio
@ -12,7 +16,7 @@ from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config from app.config import config
from app.connectors.discord_connector import DiscordConnector from app.connectors.discord_connector import DiscordConnector
from app.db import Document, DocumentType, SearchSourceConnectorType from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
@ -27,6 +31,7 @@ from .base import (
get_connector_by_id, get_connector_by_id,
get_current_timestamp, get_current_timestamp,
logger, logger,
safe_set_chunks,
update_connector_last_indexed, update_connector_last_indexed,
) )
@ -48,7 +53,11 @@ async def index_discord_messages(
on_heartbeat_callback: HeartbeatCallbackType | None = None, on_heartbeat_callback: HeartbeatCallbackType | None = None,
) -> tuple[int, str | None]: ) -> tuple[int, str | None]:
""" """
Index Discord messages from all accessible channels. Index Discord messages from the configured guild's channels.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Args: Args:
session: Database session session: Database session
@ -113,6 +122,37 @@ async def index_discord_messages(
logger.info(f"Starting Discord indexing for connector {connector_id}") logger.info(f"Starting Discord indexing for connector {connector_id}")
# =======================================================================
# GUILD FILTERING: Only index the specific guild configured for this connector
# =======================================================================
# Extract guild_id from connector config (set during OAuth flow)
configured_guild_id = connector.config.get("guild_id")
configured_guild_name = connector.config.get("guild_name")
# Legacy connector check - if no guild_id, we need to warn and handle gracefully
is_legacy_connector = configured_guild_id is None
if is_legacy_connector:
logger.warning(
f"Discord connector {connector_id} has no guild_id configured. "
"This is a legacy connector. Please reconnect the Discord server to fix this. "
"For now, indexing will be skipped to prevent indexing unwanted servers."
)
await task_logger.log_task_failure(
log_entry,
f"Legacy Discord connector {connector_id} missing guild_id",
"No guild_id configured. Please reconnect this Discord server.",
{"error_type": "MissingGuildId", "is_legacy": True},
)
return (
0,
"This Discord connector needs to be reconnected. Please disconnect and reconnect your Discord server to enable indexing.",
)
logger.info(
f"Configured to index guild: {configured_guild_name} ({configured_guild_id})"
)
# Initialize Discord client with OAuth credentials support # Initialize Discord client with OAuth credentials support
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
@ -255,77 +295,68 @@ async def index_discord_messages(
try: try:
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
f"Starting Discord bot and fetching guilds for connector {connector_id}", f"Starting Discord bot for connector {connector_id}",
{"stage": "fetch_guilds"}, {"stage": "bot_initialization"},
) )
logger.info("Starting Discord bot to fetch guilds") logger.info("Starting Discord bot")
discord_client._bot_task = asyncio.create_task(discord_client.start_bot()) discord_client._bot_task = asyncio.create_task(discord_client.start_bot())
await discord_client._wait_until_ready() await discord_client._wait_until_ready()
logger.info("Fetching Discord guilds") # We only process the configured guild, not all guilds
guilds = await discord_client.get_guilds() logger.info(
logger.info(f"Found {len(guilds)} guilds") f"Processing configured guild only: {configured_guild_name} ({configured_guild_id})"
)
except Exception as e: except Exception as e:
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, log_entry,
f"Failed to get Discord guilds for connector {connector_id}", f"Failed to start Discord bot for connector {connector_id}",
str(e), str(e),
{"error_type": "GuildFetchError"}, {"error_type": "BotStartError"},
) )
logger.error(f"Failed to get Discord guilds: {e!s}", exc_info=True) logger.error(f"Failed to start Discord bot: {e!s}", exc_info=True)
await discord_client.close_bot() await discord_client.close_bot()
return 0, f"Failed to get Discord guilds: {e!s}" return 0, f"Failed to start Discord bot: {e!s}"
if not guilds:
await task_logger.log_task_success(
log_entry,
f"No Discord guilds found for connector {connector_id}",
{"guilds_found": 0},
)
logger.info("No Discord guilds found to index")
await discord_client.close_bot()
return 0, "No Discord guilds found"
# Track results # Track results
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
skipped_channels: list[str] = [] skipped_channels: list[str] = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck # Heartbeat tracking - update notification periodically to prevent appearing stuck
last_heartbeat_time = time.time() last_heartbeat_time = time.time()
# Process each guild and channel # Use the configured guild info
guild_id = configured_guild_id
guild_name = configured_guild_name or "Unknown Guild"
await task_logger.log_task_progress( await task_logger.log_task_progress(
log_entry, log_entry,
f"Starting to process {len(guilds)} Discord guilds", f"Processing Discord guild: {guild_name}",
{"stage": "process_guilds", "total_guilds": len(guilds)}, {"stage": "process_guild", "guild_id": guild_id, "guild_name": guild_name},
) )
# =======================================================================
# PHASE 1: Collect all messages and create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
new_documents_created = False
try: try:
for guild in guilds: logger.info(f"Processing guild: {guild_name} ({guild_id})")
# Check if it's time for a heartbeat update
if (
on_heartbeat_callback
and (time.time() - last_heartbeat_time)
>= HEARTBEAT_INTERVAL_SECONDS
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
guild_id = guild["id"]
guild_name = guild["name"]
logger.info(f"Processing guild: {guild_name} ({guild_id})")
try:
channels = await discord_client.get_text_channels(guild_id)
if not channels:
logger.info(
f"No channels found in guild {guild_name}. Skipping."
)
skipped_channels.append(f"{guild_name} (no channels)")
documents_skipped += 1
continue
try:
channels = await discord_client.get_text_channels(guild_id)
if not channels:
logger.info(
f"No channels found in guild {guild_name}. Skipping."
)
skipped_channels.append(f"{guild_name} (no channels)")
else:
for channel in channels: for channel in channels:
channel_id = channel["id"] channel_id = channel["id"]
channel_name = channel["name"] channel_name = channel["name"]
@ -343,14 +374,12 @@ async def index_discord_messages(
skipped_channels.append( skipped_channels.append(
f"{guild_name}#{channel_name} (fetch error)" f"{guild_name}#{channel_name} (fetch error)"
) )
documents_skipped += 1
continue continue
if not messages: if not messages:
logger.info( logger.info(
f"No messages found in channel {channel_name} for the specified date range." f"No messages found in channel {channel_name} for the specified date range."
) )
documents_skipped += 1
continue continue
# Filter/format messages # Filter/format messages
@ -365,7 +394,6 @@ async def index_discord_messages(
logger.info( logger.info(
f"No valid messages found in channel {channel_name} after filtering." f"No valid messages found in channel {channel_name} after filtering."
) )
documents_skipped += 1
continue continue
# Process each message as an individual document (like Slack) # Process each message as an individual document (like Slack)
@ -427,55 +455,27 @@ async def index_discord_messages(
if existing_document: if existing_document:
# Document exists - check if content has changed # Document exists - check if content has changed
if existing_document.content_hash == content_hash: if existing_document.content_hash == content_hash:
logger.info( # Ensure status is ready (might have been stuck in processing/pending)
f"Document for Discord message {msg_id} in {guild_name}#{channel_name} unchanged. Skipping." if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
) existing_document.status = DocumentStatus.ready()
documents_skipped += 1 documents_skipped += 1
continue continue
else:
# Content has changed - update the existing document
logger.info(
f"Content changed for Discord message {msg_id} in {guild_name}#{channel_name}. Updating document."
)
# Update chunks and embedding # Queue existing document for update (will be set to processing in Phase 2)
chunks = await create_document_chunks( messages_to_process.append({
combined_document_string 'document': existing_document,
) 'is_new': False,
doc_embedding = ( 'combined_document_string': combined_document_string,
config.embedding_model_instance.embed( 'content_hash': content_hash,
combined_document_string 'guild_name': guild_name,
) 'guild_id': guild_id,
) 'channel_name': channel_name,
'channel_id': channel_id,
# Update existing document 'message_id': msg_id,
existing_document.content = combined_document_string 'message_timestamp': msg_timestamp,
existing_document.content_hash = content_hash 'message_user_name': msg_user_name,
existing_document.embedding = doc_embedding })
existing_document.document_metadata = { continue
"guild_name": guild_name,
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Delete old chunks and add new ones
existing_document.chunks = chunks
existing_document.updated_at = (
get_current_timestamp()
)
documents_indexed += 1
logger.info(
f"Successfully updated Discord message {msg_id}"
)
continue
# Document doesn't exist by unique_identifier_hash # Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector) # Check if a document with the same content_hash exists (from another connector)
@ -492,19 +492,11 @@ async def index_discord_messages(
f"(existing document ID: {duplicate_by_content.id}, " f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping." f"type: {duplicate_by_content.document_type}). Skipping."
) )
duplicate_content_count += 1
documents_skipped += 1 documents_skipped += 1
continue continue
# Document doesn't exist - create new one # Create new document with PENDING status (visible in UI immediately)
# Process chunks
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Create and store new document
document = Document( document = Document(
search_space_id=search_space_id, search_space_id=search_space_id,
title=f"{guild_name}#{channel_name}", title=f"{guild_name}#{channel_name}",
@ -515,87 +507,171 @@ async def index_discord_messages(
"channel_name": channel_name, "channel_name": channel_name,
"channel_id": channel_id, "channel_id": channel_id,
"message_id": msg_id, "message_id": msg_id,
"message_timestamp": msg_timestamp, "connector_id": connector_id,
"message_user_name": msg_user_name,
"indexed_at": datetime.now(UTC).strftime(
"%Y-%m-%d %H:%M:%S"
),
}, },
content=combined_document_string, content="Pending...", # Placeholder until processed
embedding=doc_embedding, content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash, unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(), updated_at=get_current_timestamp(),
created_by_id=user_id, created_by_id=user_id,
connector_id=connector_id, connector_id=connector_id,
) )
session.add(document) session.add(document)
documents_indexed += 1 new_documents_created = True
# Batch commit every 10 documents messages_to_process.append({
if documents_indexed % 10 == 0: 'document': document,
logger.info( 'is_new': True,
f"Committing batch: {documents_indexed} Discord messages processed so far" 'combined_document_string': combined_document_string,
) 'content_hash': content_hash,
await session.commit() 'guild_name': guild_name,
'guild_id': guild_id,
'channel_name': channel_name,
'channel_id': channel_id,
'message_id': msg_id,
'message_timestamp': msg_timestamp,
'message_user_name': msg_user_name,
})
logger.info( except Exception as e:
f"Successfully indexed channel {guild_name}#{channel_name} with {len(formatted_messages)} messages" logger.error(
) f"Error processing guild {guild_name}: {e!s}", exc_info=True
)
skipped_channels.append(f"{guild_name} (processing error)")
except Exception as e:
logger.error(
f"Error processing guild {guild_name}: {e!s}", exc_info=True
)
skipped_channels.append(f"{guild_name} (processing error)")
documents_skipped += 1
continue
finally: finally:
await discord_client.close_bot() await discord_client.close_bot()
# Update last_indexed_at only if we indexed at least one # Commit all pending documents - they all appear in UI now
if documents_indexed > 0: if new_documents_created:
await update_connector_last_indexed(session, connector, update_last_indexed) logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
)
# Update document to READY with actual content
document.title = f"{item['guild_name']}#{item['channel_name']}"
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.embedding = doc_embedding
document.document_metadata = {
"guild_name": item['guild_name'],
"guild_id": item['guild_id'],
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"message_id": item['message_id'],
"message_timestamp": item['message_timestamp'],
"message_user_name": item['message_user_name'],
"indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Discord messages processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Discord message: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches # Final commit for any remaining documents not yet committed in batches
logger.info( logger.info(
f"Final commit: Total {documents_indexed} Discord messages processed" f"Final commit: Total {documents_indexed} Discord messages processed"
) )
await session.commit() try:
await session.commit()
# Prepare result message logger.info(
result_message = None "Successfully committed all Discord document changes to database"
if skipped_channels:
result_message = (
f"Processed {documents_indexed} messages. Skipped {len(skipped_channels)} channels: "
+ ", ".join(skipped_channels)
) )
else: except Exception as e:
result_message = f"Processed {documents_indexed} messages." # Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
if skipped_channels:
warning_parts.append(f"{len(skipped_channels)} channels skipped")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success # Log success
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, log_entry,
f"Successfully completed Discord indexing for connector {connector_id}", f"Successfully completed Discord indexing for connector {connector_id}",
{ {
"messages_processed": documents_indexed,
"documents_indexed": documents_indexed, "documents_indexed": documents_indexed,
"documents_skipped": documents_skipped, "documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_channels_count": len(skipped_channels), "skipped_channels_count": len(skipped_channels),
"guilds_processed": len(guilds), "guild_id": guild_id,
"result_message": result_message, "guild_name": guild_name,
}, },
) )
logger.info( logger.info(
f"Discord indexing completed: {documents_indexed} new messages, {documents_skipped} skipped" f"Discord indexing completed for guild {guild_name}: {documents_indexed} ready, {documents_skipped} skipped, "
f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
) )
return ( return documents_indexed, warning_message
documents_indexed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()

View file

@ -1,17 +1,21 @@
""" """
Microsoft Teams connector indexer. Microsoft Teams connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
""" """
import time import time
from collections.abc import Awaitable, Callable from collections.abc import Awaitable, Callable
from datetime import UTC from datetime import UTC, datetime
from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config from app.config import config
from app.connectors.teams_history import TeamsHistory from app.connectors.teams_history import TeamsHistory
from app.db import Document, DocumentType, SearchSourceConnectorType from app.db import Document, DocumentStatus, DocumentType, SearchSourceConnectorType
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.utils.document_converters import ( from app.utils.document_converters import (
create_document_chunks, create_document_chunks,
@ -27,6 +31,7 @@ from .base import (
get_connector_by_id, get_connector_by_id,
get_current_timestamp, get_current_timestamp,
logger, logger,
safe_set_chunks,
update_connector_last_indexed, update_connector_last_indexed,
) )
@ -50,6 +55,10 @@ async def index_teams_messages(
""" """
Index Microsoft Teams messages from all accessible teams and channels. Index Microsoft Teams messages from all accessible teams and channels.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Args: Args:
session: Database session session: Database session
connector_id: ID of the Teams connector connector_id: ID of the Teams connector
@ -165,11 +174,16 @@ async def index_teams_messages(
f"No Teams found for connector {connector_id}", f"No Teams found for connector {connector_id}",
{"teams_found": 0}, {"teams_found": 0},
) )
return 0, "No Teams found" # CRITICAL: Update timestamp even when no teams found so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
await session.commit()
return 0, None # Return None (not error) when no items found
# Track the number of documents indexed # Track the number of documents indexed
documents_indexed = 0 documents_indexed = 0
documents_skipped = 0 documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
skipped_channels = [] skipped_channels = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck # Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -182,8 +196,6 @@ async def index_teams_messages(
) )
# Convert date strings to datetime objects for filtering # Convert date strings to datetime objects for filtering
from datetime import datetime
start_datetime = None start_datetime = None
end_datetime = None end_datetime = None
if start_date_str: if start_date_str:
@ -197,16 +209,14 @@ async def index_teams_messages(
hour=23, minute=59, second=59, tzinfo=UTC hour=23, minute=59, second=59, tzinfo=UTC
) )
# Process each team # =======================================================================
for team in teams: # PHASE 1: Collect all messages and create pending documents
# Check if it's time for a heartbeat update # This makes ALL documents visible in the UI immediately with pending status
if ( # =======================================================================
on_heartbeat_callback messages_to_process = [] # List of dicts with document and message data
and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS new_documents_created = False
):
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = time.time()
for team in teams:
team_id = team.get("id") team_id = team.get("id")
team_name = team.get("displayName", "Unknown Team") team_name = team.get("displayName", "Unknown Team")
@ -239,7 +249,6 @@ async def index_teams_messages(
channel_name, channel_name,
team_name, team_name,
) )
documents_skipped += 1
continue continue
# Process each message # Process each message
@ -322,60 +331,27 @@ async def index_teams_messages(
if existing_document: if existing_document:
# Document exists - check if content has changed # Document exists - check if content has changed
if existing_document.content_hash == content_hash: if existing_document.content_hash == content_hash:
logger.info( # Ensure status is ready (might have been stuck in processing/pending)
"Document for Teams message %s in channel %s unchanged. Skipping.", if not DocumentStatus.is_state(existing_document.status, DocumentStatus.READY):
message_id, existing_document.status = DocumentStatus.ready()
channel_name,
)
documents_skipped += 1 documents_skipped += 1
continue continue
else:
# Content has changed - update the existing document
logger.info(
"Content changed for Teams message %s in channel %s. Updating document.",
message_id,
channel_name,
)
# Update chunks and embedding # Queue existing document for update (will be set to processing in Phase 2)
chunks = await create_document_chunks( messages_to_process.append({
combined_document_string 'document': existing_document,
) 'is_new': False,
doc_embedding = ( 'combined_document_string': combined_document_string,
config.embedding_model_instance.embed( 'content_hash': content_hash,
combined_document_string 'team_name': team_name,
) 'team_id': team_id,
) 'channel_name': channel_name,
'channel_id': channel_id,
# Update existing document 'message_id': message_id,
existing_document.content = combined_document_string 'start_date': start_date_str,
existing_document.content_hash = content_hash 'end_date': end_date_str,
existing_document.embedding = doc_embedding })
existing_document.document_metadata = { continue
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}
# Delete old chunks and add new ones
existing_document.chunks = chunks
existing_document.updated_at = (
get_current_timestamp()
)
documents_indexed += 1
logger.info(
"Successfully updated Teams message %s",
message_id,
)
continue
# Document doesn't exist by unique_identifier_hash # Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector) # Check if a document with the same content_hash exists (from another connector)
@ -395,19 +371,11 @@ async def index_teams_messages(
duplicate_by_content.id, duplicate_by_content.id,
duplicate_by_content.document_type, duplicate_by_content.document_type,
) )
duplicate_content_count += 1
documents_skipped += 1 documents_skipped += 1
continue continue
# Document doesn't exist - create new one # Create new document with PENDING status (visible in UI immediately)
# Process chunks
chunks = await create_document_chunks(
combined_document_string
)
doc_embedding = config.embedding_model_instance.embed(
combined_document_string
)
# Create and store new document
document = Document( document = Document(
search_space_id=search_space_id, search_space_id=search_space_id,
title=f"{team_name} - {channel_name}", title=f"{team_name} - {channel_name}",
@ -417,40 +385,34 @@ async def index_teams_messages(
"team_id": team_id, "team_id": team_id,
"channel_name": channel_name, "channel_name": channel_name,
"channel_id": channel_id, "channel_id": channel_id,
"start_date": start_date_str, "connector_id": connector_id,
"end_date": end_date_str,
"message_count": len(messages),
"indexed_at": datetime.now().strftime(
"%Y-%m-%d %H:%M:%S"
),
}, },
content=combined_document_string, content="Pending...", # Placeholder until processed
embedding=doc_embedding, content_hash=unique_identifier_hash, # Temporary unique value - updated when ready
chunks=chunks,
content_hash=content_hash,
unique_identifier_hash=unique_identifier_hash, unique_identifier_hash=unique_identifier_hash,
embedding=None,
chunks=[], # Empty at creation - safe for async
status=DocumentStatus.pending(), # Pending until processing starts
updated_at=get_current_timestamp(), updated_at=get_current_timestamp(),
created_by_id=user_id, created_by_id=user_id,
connector_id=connector_id, connector_id=connector_id,
) )
session.add(document) session.add(document)
documents_indexed += 1 new_documents_created = True
# Batch commit every 10 documents messages_to_process.append({
if documents_indexed % 10 == 0: 'document': document,
logger.info( 'is_new': True,
"Committing batch: %s Teams messages processed so far", 'combined_document_string': combined_document_string,
documents_indexed, 'content_hash': content_hash,
) 'team_name': team_name,
await session.commit() 'team_id': team_id,
'channel_name': channel_name,
logger.info( 'channel_id': channel_id,
"Successfully indexed channel %s in team %s with %s messages", 'message_id': message_id,
channel_name, 'start_date': start_date_str,
team_name, 'end_date': end_date_str,
len(messages), })
)
except Exception as e: except Exception as e:
logger.error( logger.error(
@ -462,54 +424,141 @@ async def index_teams_messages(
skipped_channels.append( skipped_channels.append(
f"{team_name}/{channel_name} (processing error)" f"{team_name}/{channel_name} (processing error)"
) )
documents_skipped += 1
continue continue
except Exception as e: except Exception as e:
logger.error("Error processing team %s: %s", team_name, str(e)) logger.error("Error processing team %s: %s", team_name, str(e))
continue continue
# Update the last_indexed_at timestamp for the connector only if requested # Commit all pending documents - they all appear in UI now
# and if we successfully indexed at least one document if new_documents_created:
total_processed = documents_indexed logger.info(f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents")
if total_processed > 0: await session.commit()
await update_connector_last_indexed(session, connector, update_last_indexed)
# =======================================================================
# PHASE 2: Process each document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
for item in messages_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS:
await on_heartbeat_callback(documents_indexed)
last_heartbeat_time = current_time
document = item['document']
try:
# Set to PROCESSING and commit - shows "processing" in UI for THIS document only
document.status = DocumentStatus.processing()
await session.commit()
# Heavy processing (embeddings, chunks)
chunks = await create_document_chunks(item['combined_document_string'])
doc_embedding = config.embedding_model_instance.embed(
item['combined_document_string']
)
# Update document to READY with actual content
document.title = f"{item['team_name']} - {item['channel_name']}"
document.content = item['combined_document_string']
document.content_hash = item['content_hash']
document.embedding = doc_embedding
document.document_metadata = {
"team_name": item['team_name'],
"team_id": item['team_id'],
"channel_name": item['channel_name'],
"channel_id": item['channel_id'],
"start_date": item['start_date'],
"end_date": item['end_date'],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
safe_set_chunks(document, chunks)
document.updated_at = get_current_timestamp()
document.status = DocumentStatus.ready()
documents_indexed += 1
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
"Committing batch: %s Teams messages processed so far",
documents_indexed,
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Teams message: {e!s}", exc_info=True)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(f"Failed to update document status to failed: {status_error}")
documents_failed += 1
continue
# CRITICAL: Always update timestamp (even if 0 documents indexed) so Electric SQL syncs
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches # Final commit for any remaining documents not yet committed in batches
logger.info( logger.info(
"Final commit: Total %s Teams messages processed", documents_indexed "Final commit: Total %s Teams messages processed", documents_indexed
) )
await session.commit() try:
await session.commit()
logger.info(
"Successfully committed all Teams document changes to database"
)
except Exception as e:
# Handle any remaining integrity errors gracefully (race conditions, etc.)
if (
"duplicate key value violates unique constraint" in str(e).lower()
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
)
await session.rollback()
else:
raise
# Prepare result message # Build warning message if there were issues
result_message = None warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
if skipped_channels: if skipped_channels:
result_message = f"Processed {total_processed} messages. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" warning_parts.append(f"{len(skipped_channels)} channels skipped")
else: warning_message = ", ".join(warning_parts) if warning_parts else None
result_message = f"Processed {total_processed} messages."
# Log success # Log success
await task_logger.log_task_success( await task_logger.log_task_success(
log_entry, log_entry,
f"Successfully completed Teams indexing for connector {connector_id}", f"Successfully completed Teams indexing for connector {connector_id}",
{ {
"messages_processed": total_processed,
"documents_indexed": documents_indexed, "documents_indexed": documents_indexed,
"documents_skipped": documents_skipped, "documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_channels_count": len(skipped_channels), "skipped_channels_count": len(skipped_channels),
"result_message": result_message,
}, },
) )
logger.info( logger.info(
"Teams indexing completed: %s new messages, %s skipped", "Teams indexing completed: %s ready, %s skipped, %s failed "
"(%s duplicate content)",
documents_indexed, documents_indexed,
documents_skipped, documents_skipped,
documents_failed,
duplicate_content_count,
) )
return ( return documents_indexed, warning_message
total_processed,
None,
) # Return None on success (result_message is for logging only)
except SQLAlchemyError as db_error: except SQLAlchemyError as db_error:
await session.rollback() await session.rollback()