Merge pull request #796 from AnishSarkar22/feat/sur-149-batch-index

impr: batch index for messaging connectors & some fixes
This commit is contained in:
Rohan Verma 2026-02-09 15:00:16 -08:00 committed by GitHub
commit 26fd61fcbb
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 625 additions and 241 deletions

View file

@ -1,7 +1,10 @@
"""
Discord connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
Implements batch indexing: groups up to DISCORD_BATCH_SIZE messages per channel
into a single document for efficient indexing and better conversational context.
Uses 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
# Number of messages to combine into a single document for batch indexing.
# Grouping messages improves conversational context in embeddings/chunks and
# drastically reduces the number of documents, embedding calls, and DB overhead.
DISCORD_BATCH_SIZE = 100
def _build_batch_document_string(
guild_name: str,
guild_id: str,
channel_name: str,
channel_id: str,
messages: list[dict],
) -> str:
"""
Combine multiple Discord messages into a single document string.
Each message is formatted with its timestamp and author, and all messages
are concatenated into a conversation-style document. The chunker will
later split this into overlapping windows of ~8-10 consecutive messages,
preserving conversational context in each chunk's embedding.
Args:
guild_name: Name of the Discord guild
guild_id: ID of the Discord guild
channel_name: Name of the channel
channel_id: ID of the channel
messages: List of message dicts with 'author_name', 'created_at', 'content'
Returns:
Formatted document string with metadata and conversation content
"""
first_msg_time = messages[0].get("created_at", "Unknown")
last_msg_time = messages[-1].get("created_at", "Unknown")
metadata_lines = [
f"GUILD_NAME: {guild_name}",
f"GUILD_ID: {guild_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_COUNT: {len(messages)}",
f"FIRST_MESSAGE_TIME: {first_msg_time}",
f"LAST_MESSAGE_TIME: {last_msg_time}",
]
conversation_lines = []
for msg in messages:
author = msg.get("author_name", "Unknown User")
timestamp = msg.get("created_at", "Unknown Time")
content = msg.get("content", "")
conversation_lines.append(f"[{timestamp}] {author}: {content}")
metadata_sections = [
("METADATA", metadata_lines),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
"\n".join(conversation_lines),
"TEXT_END",
],
),
]
return build_document_metadata_markdown(metadata_sections)
async def index_discord_messages(
session: AsyncSession,
@ -55,6 +124,12 @@ async def index_discord_messages(
"""
Index Discord messages from the configured guild's channels.
Messages are grouped into batches of DISCORD_BATCH_SIZE per channel,
so each document contains up to 100 consecutive messages with full
conversational context. This reduces document count, embedding calls,
and DB overhead by ~100x while improving search quality through
context-aware chunk embeddings.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
@ -324,6 +399,7 @@ async def index_discord_messages(
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
total_messages_collected = 0
skipped_channels: list[str] = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -340,10 +416,12 @@ async def index_discord_messages(
)
# =======================================================================
# PHASE 1: Collect all messages and create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# PHASE 1: Collect messages, group into batches, and create pending documents
# Messages are grouped into batches of DISCORD_BATCH_SIZE per channel.
# Each batch becomes a single document with full conversational context.
# All documents are visible in the UI immediately with pending status.
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
batches_to_process = [] # List of dicts with document and batch data
new_documents_created = False
try:
@ -394,44 +472,35 @@ async def index_discord_messages(
)
continue
# Process each message as an individual document (like Slack)
for msg in formatted_messages:
msg_id = msg.get("id", "")
msg_user_name = msg.get("author_name", "Unknown User")
msg_timestamp = msg.get("created_at", "Unknown Time")
msg_text = msg.get("content", "")
total_messages_collected += len(formatted_messages)
# Format document metadata (similar to Slack)
metadata_sections = [
(
"METADATA",
[
f"GUILD_NAME: {guild_name}",
f"GUILD_ID: {guild_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_TIMESTAMP: {msg_timestamp}",
f"MESSAGE_USER_NAME: {msg_user_name}",
],
),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
msg_text,
"TEXT_END",
],
),
# =======================================================
# Group messages into batches of DISCORD_BATCH_SIZE
# Each batch becomes a single document with conversation context
# =======================================================
for batch_start in range(
0, len(formatted_messages), DISCORD_BATCH_SIZE
):
batch = formatted_messages[
batch_start : batch_start + DISCORD_BATCH_SIZE
]
# Build the document string
combined_document_string = build_document_metadata_markdown(
metadata_sections
# Build combined document string from all messages in this batch
combined_document_string = _build_batch_document_string(
guild_name=guild_name,
guild_id=guild_id,
channel_name=channel_name,
channel_id=channel_id,
messages=batch,
)
# Generate unique identifier hash for this Discord message
unique_identifier = f"{channel_id}_{msg_id}"
# Generate unique identifier for this batch using
# channel_id + first message ID + last message ID
first_msg_id = batch[0].get("id", "")
last_msg_id = batch[-1].get("id", "")
unique_identifier = (
f"{channel_id}_{first_msg_id}_{last_msg_id}"
)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.DISCORD_CONNECTOR,
unique_identifier,
@ -464,7 +533,7 @@ async def index_discord_messages(
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append(
batches_to_process.append(
{
"document": existing_document,
"is_new": False,
@ -474,9 +543,15 @@ async def index_discord_messages(
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"first_message_time": batch[0].get(
"created_at", "Unknown"
),
"last_message_time": batch[-1].get(
"created_at", "Unknown"
),
"message_count": len(batch),
}
)
continue
@ -492,7 +567,7 @@ async def index_discord_messages(
if duplicate_by_content:
logger.info(
f"Discord message {msg_id} in {guild_name}#{channel_name} already indexed by another connector "
f"Discord batch ({len(batch)} msgs) in {guild_name}#{channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
@ -510,7 +585,9 @@ async def index_discord_messages(
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"message_count": len(batch),
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
@ -526,7 +603,7 @@ async def index_discord_messages(
session.add(document)
new_documents_created = True
messages_to_process.append(
batches_to_process.append(
{
"document": document,
"is_new": True,
@ -536,12 +613,23 @@ async def index_discord_messages(
"guild_id": guild_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": msg_id,
"message_timestamp": msg_timestamp,
"message_user_name": msg_user_name,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"first_message_time": batch[0].get(
"created_at", "Unknown"
),
"last_message_time": batch[-1].get(
"created_at", "Unknown"
),
"message_count": len(batch),
}
)
logger.info(
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, "
f"grouped into {(len(formatted_messages) + DISCORD_BATCH_SIZE - 1) // DISCORD_BATCH_SIZE} batch(es)"
)
except Exception as e:
logger.error(
f"Error processing guild {guild_name}: {e!s}", exc_info=True
@ -554,17 +642,18 @@ async def index_discord_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents "
f"({total_messages_collected} total messages across all channels)"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# PHASE 2: Process each batch document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
logger.info(f"Phase 2: Processing {len(batches_to_process)} batch documents")
for item in messages_to_process:
for item in batches_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
@ -594,9 +683,11 @@ async def index_discord_messages(
"guild_id": item["guild_id"],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"message_id": item["message_id"],
"message_timestamp": item["message_timestamp"],
"message_user_name": item["message_user_name"],
"first_message_id": item["first_message_id"],
"last_message_id": item["last_message_id"],
"first_message_time": item["first_message_time"],
"last_message_time": item["last_message_time"],
"message_count": item["message_count"],
"indexed_at": datetime.now(UTC).strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -609,12 +700,14 @@ async def index_discord_messages(
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Discord messages processed so far"
f"Committing batch: {documents_indexed} batch documents processed so far"
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Discord message: {e!s}", exc_info=True)
logger.error(
f"Error processing Discord batch document: {e!s}", exc_info=True
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
@ -631,7 +724,8 @@ async def index_discord_messages(
# Final commit for any remaining documents not yet committed in batches
logger.info(
f"Final commit: Total {documents_indexed} Discord messages processed"
f"Final commit: Total {documents_indexed} batch documents processed "
f"(from {total_messages_collected} messages)"
)
try:
await session.commit()
@ -672,14 +766,18 @@ async def index_discord_messages(
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_channels_count": len(skipped_channels),
"total_messages_collected": total_messages_collected,
"batch_size": DISCORD_BATCH_SIZE,
"guild_id": guild_id,
"guild_name": guild_name,
},
)
logger.info(
f"Discord indexing completed for guild {guild_name}: {documents_indexed} ready, {documents_skipped} skipped, "
f"{documents_failed} failed ({duplicate_content_count} duplicate content)"
f"Discord indexing completed for guild {guild_name}: "
f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message

View file

@ -1,7 +1,10 @@
"""
Slack connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
Implements batch indexing: groups up to SLACK_BATCH_SIZE messages per channel
into a single document for efficient indexing and better conversational context.
Uses 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
@ -42,6 +45,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
# Number of messages to combine into a single document for batch indexing.
# Grouping messages improves conversational context in embeddings/chunks and
# drastically reduces the number of documents, embedding calls, and DB overhead.
SLACK_BATCH_SIZE = 100
def _build_batch_document_string(
team_name: str,
team_id: str,
channel_name: str,
channel_id: str,
messages: list[dict],
) -> str:
"""
Combine multiple Slack messages into a single document string.
Each message is formatted with its timestamp and author, and all messages
are concatenated into a conversation-style document. The chunker will
later split this into overlapping windows of ~8-10 consecutive messages,
preserving conversational context in each chunk's embedding.
Args:
team_name: Name of the Slack workspace
team_id: ID of the Slack workspace
channel_name: Name of the channel
channel_id: ID of the channel
messages: List of formatted message dicts with 'user_name', 'datetime', 'text'
Returns:
Formatted document string with metadata and conversation content
"""
first_msg_time = messages[0].get("datetime", "Unknown")
last_msg_time = messages[-1].get("datetime", "Unknown")
metadata_lines = [
f"WORKSPACE_NAME: {team_name}",
f"WORKSPACE_ID: {team_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_COUNT: {len(messages)}",
f"FIRST_MESSAGE_TIME: {first_msg_time}",
f"LAST_MESSAGE_TIME: {last_msg_time}",
]
conversation_lines = []
for msg in messages:
author = msg.get("user_name", "Unknown User")
timestamp = msg.get("datetime", "Unknown Time")
content = msg.get("text", "")
conversation_lines.append(f"[{timestamp}] {author}: {content}")
metadata_sections = [
("METADATA", metadata_lines),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
"\n".join(conversation_lines),
"TEXT_END",
],
),
]
return build_document_metadata_markdown(metadata_sections)
async def index_slack_messages(
session: AsyncSession,
@ -56,6 +125,16 @@ async def index_slack_messages(
"""
Index Slack messages from all accessible channels.
Messages are grouped into batches of SLACK_BATCH_SIZE per channel,
so each document contains up to 100 consecutive messages with full
conversational context. This reduces document count, embedding calls,
and DB overhead by ~100x while improving search quality through
context-aware chunk embeddings.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
Args:
session: Database session
connector_id: ID of the Slack connector
@ -109,6 +188,10 @@ async def index_slack_messages(
f"Connector with ID {connector_id} not found or is not a Slack connector",
)
# Extract workspace info from connector config
team_id = connector.config.get("team_id", "")
team_name = connector.config.get("team_name", "Unknown Workspace")
# Note: Token handling is now done automatically by SlackHistory
# with auto-refresh support. We just need to pass session and connector_id.
@ -182,6 +265,8 @@ async def index_slack_messages(
documents_indexed = 0
documents_skipped = 0
documents_failed = 0 # Track messages that failed processing
duplicate_content_count = 0
total_messages_collected = 0
skipped_channels = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -194,10 +279,12 @@ async def index_slack_messages(
)
# =======================================================================
# PHASE 1: Collect all messages from all channels, create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# PHASE 1: Collect messages, group into batches, and create pending documents
# Messages are grouped into batches of SLACK_BATCH_SIZE per channel.
# Each batch becomes a single document with full conversational context.
# All documents are visible in the UI immediately with pending status.
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
batches_to_process = [] # List of dicts with document and batch data
new_documents_created = False
for channel_obj in channels:
@ -264,40 +351,35 @@ async def index_slack_messages(
documents_skipped += 1
continue # Skip if no valid messages after filtering
for msg in formatted_messages:
timestamp = msg.get("datetime", "Unknown Time")
msg_ts = msg.get("ts", timestamp) # Get original Slack timestamp
msg_user_name = msg.get("user_name", "Unknown User")
msg_user_email = msg.get("user_email", "Unknown Email")
msg_text = msg.get("text", "")
total_messages_collected += len(formatted_messages)
# Format document metadata
metadata_sections = [
(
"METADATA",
[
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_TIMESTAMP: {timestamp}",
f"MESSAGE_USER_NAME: {msg_user_name}",
f"MESSAGE_USER_EMAIL: {msg_user_email}",
],
),
(
"CONTENT",
["FORMAT: markdown", "TEXT_START", msg_text, "TEXT_END"],
),
# =======================================================
# Group messages into batches of SLACK_BATCH_SIZE
# Each batch becomes a single document with conversation context
# =======================================================
for batch_start in range(0, len(formatted_messages), SLACK_BATCH_SIZE):
batch = formatted_messages[
batch_start : batch_start + SLACK_BATCH_SIZE
]
# Build the document string
combined_document_string = build_document_metadata_markdown(
metadata_sections
# Build combined document string from all messages in this batch
combined_document_string = _build_batch_document_string(
team_name=team_name,
team_id=team_id,
channel_name=channel_name,
channel_id=channel_id,
messages=batch,
)
# Generate unique identifier hash for this Slack message
unique_identifier = f"{channel_id}_{msg_ts}"
# Generate unique identifier for this batch using
# channel_id + first message ts + last message ts
first_msg_ts = batch[0].get("timestamp", "")
last_msg_ts = batch[-1].get("timestamp", "")
unique_identifier = f"{channel_id}_{first_msg_ts}_{last_msg_ts}"
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.SLACK_CONNECTOR, unique_identifier, search_space_id
DocumentType.SLACK_CONNECTOR,
unique_identifier,
search_space_id,
)
# Generate content hash
@ -318,25 +400,31 @@ async def index_slack_messages(
existing_document.status, DocumentStatus.READY
):
existing_document.status = DocumentStatus.ready()
logger.info(
f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping."
)
documents_skipped += 1
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append(
batches_to_process.append(
{
"document": existing_document,
"is_new": False,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"msg_ts": msg_ts,
"first_message_ts": first_msg_ts,
"last_message_ts": last_msg_ts,
"first_message_time": batch[0].get(
"datetime", "Unknown"
),
"last_message_time": batch[-1].get(
"datetime", "Unknown"
),
"message_count": len(batch),
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
}
)
continue
@ -350,22 +438,27 @@ async def index_slack_messages(
if duplicate_by_content:
logger.info(
f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector "
f"Slack batch ({len(batch)} msgs) in {team_name}#{channel_name} already indexed by another connector "
f"(existing document ID: {duplicate_by_content.id}, "
f"type: {duplicate_by_content.document_type}). Skipping."
)
duplicate_content_count += 1
documents_skipped += 1
continue
# Create new document with PENDING status (visible in UI immediately)
document = Document(
search_space_id=search_space_id,
title=channel_name,
title=f"{team_name}#{channel_name}",
document_type=DocumentType.SLACK_CONNECTOR,
document_metadata={
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"msg_ts": msg_ts,
"first_message_ts": first_msg_ts,
"last_message_ts": last_msg_ts,
"message_count": len(batch),
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
@ -381,23 +474,29 @@ async def index_slack_messages(
session.add(document)
new_documents_created = True
messages_to_process.append(
batches_to_process.append(
{
"document": document,
"is_new": True,
"combined_document_string": combined_document_string,
"content_hash": content_hash,
"team_name": team_name,
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"msg_ts": msg_ts,
"first_message_ts": first_msg_ts,
"last_message_ts": last_msg_ts,
"first_message_time": batch[0].get("datetime", "Unknown"),
"last_message_time": batch[-1].get("datetime", "Unknown"),
"message_count": len(batch),
"start_date": start_date_str,
"end_date": end_date_str,
"message_count": len(formatted_messages),
}
)
logger.info(
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}"
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, "
f"grouped into {(len(formatted_messages) + SLACK_BATCH_SIZE - 1) // SLACK_BATCH_SIZE} batch(es)"
)
except SlackApiError as slack_error:
@ -416,17 +515,18 @@ async def index_slack_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents "
f"({total_messages_collected} total messages across all channels)"
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# PHASE 2: Process each batch document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
logger.info(f"Phase 2: Processing {len(batches_to_process)} batch documents")
for item in messages_to_process:
for item in batches_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
@ -447,16 +547,22 @@ async def index_slack_messages(
)
# Update document to READY with actual content
document.title = item["channel_name"]
document.title = f"{item['team_name']}#{item['channel_name']}"
document.content = item["combined_document_string"]
document.content_hash = item["content_hash"]
document.embedding = doc_embedding
document.document_metadata = {
"team_name": item["team_name"],
"team_id": item["team_id"],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"first_message_ts": item["first_message_ts"],
"last_message_ts": item["last_message_ts"],
"first_message_time": item["first_message_time"],
"last_message_time": item["last_message_time"],
"message_count": item["message_count"],
"start_date": item["start_date"],
"end_date": item["end_date"],
"message_count": item["message_count"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"connector_id": connector_id,
}
@ -469,13 +575,13 @@ async def index_slack_messages(
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
f"Committing batch: {documents_indexed} Slack messages processed so far"
f"Committing batch: {documents_indexed} batch documents processed so far"
)
await session.commit()
except Exception as e:
logger.error(
f"Error processing Slack message {item.get('msg_ts', 'Unknown')}: {e!s}",
f"Error processing Slack batch document: {e!s}",
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
@ -493,7 +599,10 @@ async def index_slack_messages(
await update_connector_last_indexed(session, connector, update_last_indexed)
# Final commit for any remaining documents not yet committed in batches
logger.info(f"Final commit: Total {documents_indexed} Slack messages processed")
logger.info(
f"Final commit: Total {documents_indexed} batch documents processed "
f"(from {total_messages_collected} messages)"
)
try:
await session.commit()
logger.info("Successfully committed all Slack document changes to database")
@ -514,8 +623,12 @@ async def index_slack_messages(
# Build warning message if there were issues
warning_parts = []
if duplicate_content_count > 0:
warning_parts.append(f"{duplicate_content_count} duplicate")
if documents_failed > 0:
warning_parts.append(f"{documents_failed} failed")
if skipped_channels:
warning_parts.append(f"{len(skipped_channels)} channels skipped")
warning_message = ", ".join(warning_parts) if warning_parts else None
# Log success
@ -527,13 +640,20 @@ async def index_slack_messages(
"documents_indexed": documents_indexed,
"documents_skipped": documents_skipped,
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_channels_count": len(skipped_channels),
"total_messages_collected": total_messages_collected,
"batch_size": SLACK_BATCH_SIZE,
"team_id": team_id,
"team_name": team_name,
},
)
logger.info(
f"Slack indexing completed: {documents_indexed} ready, "
f"{documents_skipped} skipped, {documents_failed} failed"
f"Slack indexing completed for workspace {team_name}: "
f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), "
f"{documents_skipped} skipped, {documents_failed} failed "
f"({duplicate_content_count} duplicate content)"
)
return documents_indexed, warning_message

View file

@ -1,7 +1,10 @@
"""
Microsoft Teams connector indexer.
Implements 2-phase document status updates for real-time UI feedback:
Implements batch indexing: groups up to TEAMS_BATCH_SIZE messages per channel
into a single document for efficient indexing and better conversational context.
Uses 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
"""
@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
# Heartbeat interval in seconds - update notification every 30 seconds
HEARTBEAT_INTERVAL_SECONDS = 30
# Number of messages to combine into a single document for batch indexing.
# Grouping messages improves conversational context in embeddings/chunks and
# drastically reduces the number of documents, embedding calls, and DB overhead.
TEAMS_BATCH_SIZE = 100
def _build_batch_document_string(
team_name: str,
team_id: str,
channel_name: str,
channel_id: str,
messages: list[dict],
) -> str:
"""
Combine multiple Teams messages into a single document string.
Each message is formatted with its timestamp and author, and all messages
are concatenated into a conversation-style document. The chunker will
later split this into overlapping windows of ~8-10 consecutive messages,
preserving conversational context in each chunk's embedding.
Args:
team_name: Name of the Microsoft Team
team_id: ID of the Microsoft Team
channel_name: Name of the channel
channel_id: ID of the channel
messages: List of formatted message dicts with 'user_name', 'created_datetime', 'content'
Returns:
Formatted document string with metadata and conversation content
"""
first_msg_time = messages[0].get("created_datetime", "Unknown")
last_msg_time = messages[-1].get("created_datetime", "Unknown")
metadata_lines = [
f"TEAM_NAME: {team_name}",
f"TEAM_ID: {team_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_COUNT: {len(messages)}",
f"FIRST_MESSAGE_TIME: {first_msg_time}",
f"LAST_MESSAGE_TIME: {last_msg_time}",
]
conversation_lines = []
for msg in messages:
author = msg.get("user_name", "Unknown User")
timestamp = msg.get("created_datetime", "Unknown Time")
content = msg.get("content", "")
conversation_lines.append(f"[{timestamp}] {author}: {content}")
metadata_sections = [
("METADATA", metadata_lines),
(
"CONTENT",
[
"FORMAT: markdown",
"TEXT_START",
"\n".join(conversation_lines),
"TEXT_END",
],
),
]
return build_document_metadata_markdown(metadata_sections)
async def index_teams_messages(
session: AsyncSession,
@ -55,6 +124,12 @@ async def index_teams_messages(
"""
Index Microsoft Teams messages from all accessible teams and channels.
Messages are grouped into batches of TEAMS_BATCH_SIZE per channel,
so each document contains up to 100 consecutive messages with full
conversational context. This reduces document count, embedding calls,
and DB overhead by ~100x while improving search quality through
context-aware chunk embeddings.
Implements 2-phase document status updates for real-time UI feedback:
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
- Phase 2: Process each document: pending processing ready/failed
@ -184,6 +259,7 @@ async def index_teams_messages(
documents_skipped = 0
documents_failed = 0
duplicate_content_count = 0
total_messages_collected = 0
skipped_channels = []
# Heartbeat tracking - update notification periodically to prevent appearing stuck
@ -199,21 +275,21 @@ async def index_teams_messages(
start_datetime = None
end_datetime = None
if start_date_str:
# Parse as naive datetime and make it timezone-aware (UTC)
start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(
tzinfo=UTC
)
if end_date_str:
# Parse as naive datetime, set to end of day, and make it timezone-aware (UTC)
end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(
hour=23, minute=59, second=59, tzinfo=UTC
)
# =======================================================================
# PHASE 1: Collect all messages and create pending documents
# This makes ALL documents visible in the UI immediately with pending status
# PHASE 1: Collect messages, group into batches, and create pending documents
# Messages are grouped into batches of TEAMS_BATCH_SIZE per channel.
# Each batch becomes a single document with full conversational context.
# All documents are visible in the UI immediately with pending status.
# =======================================================================
messages_to_process = [] # List of dicts with document and message data
batches_to_process = [] # List of dicts with document and batch data
new_documents_created = False
for team in teams:
@ -251,65 +327,72 @@ async def index_teams_messages(
)
continue
# Process each message
# Format messages for batching
formatted_messages = []
for msg in messages:
# Skip deleted messages or empty content
if msg.get("deletedDateTime"):
continue
# Extract message details
message_id = msg.get("id", "")
created_datetime = msg.get("createdDateTime", "")
from_user = msg.get("from", {})
user_name = from_user.get("user", {}).get(
"displayName", "Unknown User"
)
user_email = from_user.get("user", {}).get(
"userPrincipalName", "Unknown Email"
)
# Extract message content
body = msg.get("body", {})
content_type = body.get("contentType", "text")
msg_text = body.get("content", "")
# Skip empty messages
if not msg_text or msg_text.strip() == "":
continue
# Format document metadata
metadata_sections = [
(
"METADATA",
[
f"TEAM_NAME: {team_name}",
f"TEAM_ID: {team_id}",
f"CHANNEL_NAME: {channel_name}",
f"CHANNEL_ID: {channel_id}",
f"MESSAGE_TIMESTAMP: {created_datetime}",
f"MESSAGE_USER_NAME: {user_name}",
f"MESSAGE_USER_EMAIL: {user_email}",
f"CONTENT_TYPE: {content_type}",
],
),
(
"CONTENT",
[
f"FORMAT: {content_type}",
"TEXT_START",
msg_text,
"TEXT_END",
],
),
]
# Build the document string
combined_document_string = build_document_metadata_markdown(
metadata_sections
formatted_messages.append(
{
"message_id": msg.get("id", ""),
"created_datetime": msg.get("createdDateTime", ""),
"user_name": user_name,
"content": msg_text,
}
)
# Generate unique identifier hash for this Teams message
unique_identifier = f"{team_id}_{channel_id}_{message_id}"
if not formatted_messages:
logger.info(
"No valid messages found in channel %s of team %s after filtering.",
channel_name,
team_name,
)
documents_skipped += 1
continue
total_messages_collected += len(formatted_messages)
# =======================================================
# Group messages into batches of TEAMS_BATCH_SIZE
# Each batch becomes a single document with conversation context
# =======================================================
for batch_start in range(
0, len(formatted_messages), TEAMS_BATCH_SIZE
):
batch = formatted_messages[
batch_start : batch_start + TEAMS_BATCH_SIZE
]
# Build combined document string from all messages in this batch
combined_document_string = _build_batch_document_string(
team_name=team_name,
team_id=team_id,
channel_name=channel_name,
channel_id=channel_id,
messages=batch,
)
# Generate unique identifier for this batch using
# team_id + channel_id + first message id + last message id
first_msg_id = batch[0].get("message_id", "")
last_msg_id = batch[-1].get("message_id", "")
unique_identifier = (
f"{team_id}_{channel_id}_{first_msg_id}_{last_msg_id}"
)
unique_identifier_hash = generate_unique_identifier_hash(
DocumentType.TEAMS_CONNECTOR,
unique_identifier,
@ -331,7 +414,6 @@ async def index_teams_messages(
if existing_document:
# Document exists - check if content has changed
if existing_document.content_hash == content_hash:
# Ensure status is ready (might have been stuck in processing/pending)
if not DocumentStatus.is_state(
existing_document.status, DocumentStatus.READY
):
@ -342,7 +424,7 @@ async def index_teams_messages(
continue
# Queue existing document for update (will be set to processing in Phase 2)
messages_to_process.append(
batches_to_process.append(
{
"document": existing_document,
"is_new": False,
@ -352,14 +434,21 @@ async def index_teams_messages(
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": message_id,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"first_message_time": batch[0].get(
"created_datetime", "Unknown"
),
"last_message_time": batch[-1].get(
"created_datetime", "Unknown"
),
"message_count": len(batch),
"start_date": start_date_str,
"end_date": end_date_str,
}
)
continue
# Document doesn't exist by unique_identifier_hash
# Check if a document with the same content_hash exists (from another connector)
with session.no_autoflush:
duplicate_by_content = (
@ -370,9 +459,10 @@ async def index_teams_messages(
if duplicate_by_content:
logger.info(
"Teams message %s in channel %s already indexed by another connector "
"Teams batch (%s msgs) in %s/%s already indexed by another connector "
"(existing document ID: %s, type: %s). Skipping.",
message_id,
len(batch),
team_name,
channel_name,
duplicate_by_content.id,
duplicate_by_content.document_type,
@ -391,6 +481,9 @@ async def index_teams_messages(
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"message_count": len(batch),
"connector_id": connector_id,
},
content="Pending...", # Placeholder until processed
@ -406,7 +499,7 @@ async def index_teams_messages(
session.add(document)
new_documents_created = True
messages_to_process.append(
batches_to_process.append(
{
"document": document,
"is_new": True,
@ -416,12 +509,30 @@ async def index_teams_messages(
"team_id": team_id,
"channel_name": channel_name,
"channel_id": channel_id,
"message_id": message_id,
"first_message_id": first_msg_id,
"last_message_id": last_msg_id,
"first_message_time": batch[0].get(
"created_datetime", "Unknown"
),
"last_message_time": batch[-1].get(
"created_datetime", "Unknown"
),
"message_count": len(batch),
"start_date": start_date_str,
"end_date": end_date_str,
}
)
logger.info(
"Phase 1: Collected %s messages from %s/%s, "
"grouped into %s batch(es)",
len(formatted_messages),
team_name,
channel_name,
(len(formatted_messages) + TEAMS_BATCH_SIZE - 1)
// TEAMS_BATCH_SIZE,
)
except Exception as e:
logger.error(
"Error processing channel %s in team %s: %s",
@ -441,17 +552,20 @@ async def index_teams_messages(
# Commit all pending documents - they all appear in UI now
if new_documents_created:
logger.info(
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
"Phase 1: Committing %s pending batch documents "
"(%s total messages across all channels)",
len([b for b in batches_to_process if b["is_new"]]),
total_messages_collected,
)
await session.commit()
# =======================================================================
# PHASE 2: Process each document one by one
# PHASE 2: Process each batch document one by one
# Each document transitions: pending → processing → ready/failed
# =======================================================================
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
logger.info("Phase 2: Processing %s batch documents", len(batches_to_process))
for item in messages_to_process:
for item in batches_to_process:
# Send heartbeat periodically
if on_heartbeat_callback:
current_time = time.time()
@ -481,6 +595,11 @@ async def index_teams_messages(
"team_id": item["team_id"],
"channel_name": item["channel_name"],
"channel_id": item["channel_id"],
"first_message_id": item["first_message_id"],
"last_message_id": item["last_message_id"],
"first_message_time": item["first_message_time"],
"last_message_time": item["last_message_time"],
"message_count": item["message_count"],
"start_date": item["start_date"],
"end_date": item["end_date"],
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
@ -495,20 +614,25 @@ async def index_teams_messages(
# Batch commit every 10 documents (for ready status updates)
if documents_indexed % 10 == 0:
logger.info(
"Committing batch: %s Teams messages processed so far",
"Committing batch: %s Teams batch documents processed so far",
documents_indexed,
)
await session.commit()
except Exception as e:
logger.error(f"Error processing Teams message: {e!s}", exc_info=True)
logger.error(
"Error processing Teams batch document: %s",
str(e),
exc_info=True,
)
# Mark document as failed with reason (visible in UI)
try:
document.status = DocumentStatus.failed(str(e))
document.updated_at = get_current_timestamp()
except Exception as status_error:
logger.error(
f"Failed to update document status to failed: {status_error}"
"Failed to update document status to failed: %s",
str(status_error),
)
documents_failed += 1
continue
@ -518,7 +642,9 @@ async def index_teams_messages(
# Final commit for any remaining documents not yet committed in batches
logger.info(
"Final commit: Total %s Teams messages processed", documents_indexed
"Final commit: Total %s Teams batch documents processed (from %s messages)",
documents_indexed,
total_messages_collected,
)
try:
await session.commit()
@ -530,8 +656,9 @@ async def index_teams_messages(
or "uniqueviolationerror" in str(e).lower()
):
logger.warning(
f"Duplicate content_hash detected during final commit. "
f"Rolling back and continuing. Error: {e!s}"
"Duplicate content_hash detected during final commit. "
"Rolling back and continuing. Error: %s",
str(e),
)
await session.rollback()
else:
@ -557,13 +684,16 @@ async def index_teams_messages(
"documents_failed": documents_failed,
"duplicate_content_count": duplicate_content_count,
"skipped_channels_count": len(skipped_channels),
"total_messages_collected": total_messages_collected,
"batch_size": TEAMS_BATCH_SIZE,
},
)
logger.info(
"Teams indexing completed: %s ready, %s skipped, %s failed "
"(%s duplicate content)",
"Teams indexing completed: %s batch docs ready (from %s messages), "
"%s skipped, %s failed (%s duplicate content)",
documents_indexed,
total_messages_collected,
documents_skipped,
documents_failed,
duplicate_content_count,