mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-02 19:55:18 +02:00
feat: implement batch indexing for Microsoft Teams messages to improve efficiency and conversational context
This commit is contained in:
parent
e053b1b988
commit
20ab128b05
1 changed files with 196 additions and 66 deletions
|
|
@ -1,7 +1,10 @@
|
|||
"""
|
||||
Microsoft Teams connector indexer.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
Implements batch indexing: groups up to TEAMS_BATCH_SIZE messages per channel
|
||||
into a single document for efficient indexing and better conversational context.
|
||||
|
||||
Uses 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
"""
|
||||
|
|
@ -41,6 +44,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
|
|||
# Heartbeat interval in seconds - update notification every 30 seconds
|
||||
HEARTBEAT_INTERVAL_SECONDS = 30
|
||||
|
||||
# Number of messages to combine into a single document for batch indexing.
|
||||
# Grouping messages improves conversational context in embeddings/chunks and
|
||||
# drastically reduces the number of documents, embedding calls, and DB overhead.
|
||||
TEAMS_BATCH_SIZE = 100
|
||||
|
||||
|
||||
def _build_batch_document_string(
|
||||
team_name: str,
|
||||
team_id: str,
|
||||
channel_name: str,
|
||||
channel_id: str,
|
||||
messages: list[dict],
|
||||
) -> str:
|
||||
"""
|
||||
Combine multiple Teams messages into a single document string.
|
||||
|
||||
Each message is formatted with its timestamp and author, and all messages
|
||||
are concatenated into a conversation-style document. The chunker will
|
||||
later split this into overlapping windows of ~8-10 consecutive messages,
|
||||
preserving conversational context in each chunk's embedding.
|
||||
|
||||
Args:
|
||||
team_name: Name of the Microsoft Team
|
||||
team_id: ID of the Microsoft Team
|
||||
channel_name: Name of the channel
|
||||
channel_id: ID of the channel
|
||||
messages: List of formatted message dicts with 'user_name', 'created_datetime', 'content'
|
||||
|
||||
Returns:
|
||||
Formatted document string with metadata and conversation content
|
||||
"""
|
||||
first_msg_time = messages[0].get("created_datetime", "Unknown")
|
||||
last_msg_time = messages[-1].get("created_datetime", "Unknown")
|
||||
|
||||
metadata_lines = [
|
||||
f"TEAM_NAME: {team_name}",
|
||||
f"TEAM_ID: {team_id}",
|
||||
f"CHANNEL_NAME: {channel_name}",
|
||||
f"CHANNEL_ID: {channel_id}",
|
||||
f"MESSAGE_COUNT: {len(messages)}",
|
||||
f"FIRST_MESSAGE_TIME: {first_msg_time}",
|
||||
f"LAST_MESSAGE_TIME: {last_msg_time}",
|
||||
]
|
||||
|
||||
conversation_lines = []
|
||||
for msg in messages:
|
||||
author = msg.get("user_name", "Unknown User")
|
||||
timestamp = msg.get("created_datetime", "Unknown Time")
|
||||
content = msg.get("content", "")
|
||||
conversation_lines.append(f"[{timestamp}] {author}: {content}")
|
||||
|
||||
metadata_sections = [
|
||||
("METADATA", metadata_lines),
|
||||
(
|
||||
"CONTENT",
|
||||
[
|
||||
"FORMAT: markdown",
|
||||
"TEXT_START",
|
||||
"\n".join(conversation_lines),
|
||||
"TEXT_END",
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
return build_document_metadata_markdown(metadata_sections)
|
||||
|
||||
|
||||
async def index_teams_messages(
|
||||
session: AsyncSession,
|
||||
|
|
@ -55,6 +124,12 @@ async def index_teams_messages(
|
|||
"""
|
||||
Index Microsoft Teams messages from all accessible teams and channels.
|
||||
|
||||
Messages are grouped into batches of TEAMS_BATCH_SIZE per channel,
|
||||
so each document contains up to 100 consecutive messages with full
|
||||
conversational context. This reduces document count, embedding calls,
|
||||
and DB overhead by ~100x while improving search quality through
|
||||
context-aware chunk embeddings.
|
||||
|
||||
Implements 2-phase document status updates for real-time UI feedback:
|
||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||
- Phase 2: Process each document: pending → processing → ready/failed
|
||||
|
|
@ -184,6 +259,7 @@ async def index_teams_messages(
|
|||
documents_skipped = 0
|
||||
documents_failed = 0
|
||||
duplicate_content_count = 0
|
||||
total_messages_collected = 0
|
||||
skipped_channels = []
|
||||
|
||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||
|
|
@ -199,21 +275,21 @@ async def index_teams_messages(
|
|||
start_datetime = None
|
||||
end_datetime = None
|
||||
if start_date_str:
|
||||
# Parse as naive datetime and make it timezone-aware (UTC)
|
||||
start_datetime = datetime.strptime(start_date_str, "%Y-%m-%d").replace(
|
||||
tzinfo=UTC
|
||||
)
|
||||
if end_date_str:
|
||||
# Parse as naive datetime, set to end of day, and make it timezone-aware (UTC)
|
||||
end_datetime = datetime.strptime(end_date_str, "%Y-%m-%d").replace(
|
||||
hour=23, minute=59, second=59, tzinfo=UTC
|
||||
)
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 1: Collect all messages and create pending documents
|
||||
# This makes ALL documents visible in the UI immediately with pending status
|
||||
# PHASE 1: Collect messages, group into batches, and create pending documents
|
||||
# Messages are grouped into batches of TEAMS_BATCH_SIZE per channel.
|
||||
# Each batch becomes a single document with full conversational context.
|
||||
# All documents are visible in the UI immediately with pending status.
|
||||
# =======================================================================
|
||||
messages_to_process = [] # List of dicts with document and message data
|
||||
batches_to_process = [] # List of dicts with document and batch data
|
||||
new_documents_created = False
|
||||
|
||||
for team in teams:
|
||||
|
|
@ -251,65 +327,72 @@ async def index_teams_messages(
|
|||
)
|
||||
continue
|
||||
|
||||
# Process each message
|
||||
# Format messages for batching
|
||||
formatted_messages = []
|
||||
for msg in messages:
|
||||
# Skip deleted messages or empty content
|
||||
if msg.get("deletedDateTime"):
|
||||
continue
|
||||
|
||||
# Extract message details
|
||||
message_id = msg.get("id", "")
|
||||
created_datetime = msg.get("createdDateTime", "")
|
||||
from_user = msg.get("from", {})
|
||||
user_name = from_user.get("user", {}).get(
|
||||
"displayName", "Unknown User"
|
||||
)
|
||||
user_email = from_user.get("user", {}).get(
|
||||
"userPrincipalName", "Unknown Email"
|
||||
)
|
||||
|
||||
# Extract message content
|
||||
body = msg.get("body", {})
|
||||
content_type = body.get("contentType", "text")
|
||||
msg_text = body.get("content", "")
|
||||
|
||||
# Skip empty messages
|
||||
if not msg_text or msg_text.strip() == "":
|
||||
continue
|
||||
|
||||
# Format document metadata
|
||||
metadata_sections = [
|
||||
(
|
||||
"METADATA",
|
||||
[
|
||||
f"TEAM_NAME: {team_name}",
|
||||
f"TEAM_ID: {team_id}",
|
||||
f"CHANNEL_NAME: {channel_name}",
|
||||
f"CHANNEL_ID: {channel_id}",
|
||||
f"MESSAGE_TIMESTAMP: {created_datetime}",
|
||||
f"MESSAGE_USER_NAME: {user_name}",
|
||||
f"MESSAGE_USER_EMAIL: {user_email}",
|
||||
f"CONTENT_TYPE: {content_type}",
|
||||
],
|
||||
),
|
||||
(
|
||||
"CONTENT",
|
||||
[
|
||||
f"FORMAT: {content_type}",
|
||||
"TEXT_START",
|
||||
msg_text,
|
||||
"TEXT_END",
|
||||
],
|
||||
),
|
||||
]
|
||||
|
||||
# Build the document string
|
||||
combined_document_string = build_document_metadata_markdown(
|
||||
metadata_sections
|
||||
formatted_messages.append(
|
||||
{
|
||||
"message_id": msg.get("id", ""),
|
||||
"created_datetime": msg.get("createdDateTime", ""),
|
||||
"user_name": user_name,
|
||||
"content": msg_text,
|
||||
}
|
||||
)
|
||||
|
||||
# Generate unique identifier hash for this Teams message
|
||||
unique_identifier = f"{team_id}_{channel_id}_{message_id}"
|
||||
if not formatted_messages:
|
||||
logger.info(
|
||||
"No valid messages found in channel %s of team %s after filtering.",
|
||||
channel_name,
|
||||
team_name,
|
||||
)
|
||||
documents_skipped += 1
|
||||
continue
|
||||
|
||||
total_messages_collected += len(formatted_messages)
|
||||
|
||||
# =======================================================
|
||||
# Group messages into batches of TEAMS_BATCH_SIZE
|
||||
# Each batch becomes a single document with conversation context
|
||||
# =======================================================
|
||||
for batch_start in range(
|
||||
0, len(formatted_messages), TEAMS_BATCH_SIZE
|
||||
):
|
||||
batch = formatted_messages[
|
||||
batch_start : batch_start + TEAMS_BATCH_SIZE
|
||||
]
|
||||
|
||||
# Build combined document string from all messages in this batch
|
||||
combined_document_string = _build_batch_document_string(
|
||||
team_name=team_name,
|
||||
team_id=team_id,
|
||||
channel_name=channel_name,
|
||||
channel_id=channel_id,
|
||||
messages=batch,
|
||||
)
|
||||
|
||||
# Generate unique identifier for this batch using
|
||||
# team_id + channel_id + first message id + last message id
|
||||
first_msg_id = batch[0].get("message_id", "")
|
||||
last_msg_id = batch[-1].get("message_id", "")
|
||||
unique_identifier = (
|
||||
f"{team_id}_{channel_id}_{first_msg_id}_{last_msg_id}"
|
||||
)
|
||||
unique_identifier_hash = generate_unique_identifier_hash(
|
||||
DocumentType.TEAMS_CONNECTOR,
|
||||
unique_identifier,
|
||||
|
|
@ -331,7 +414,6 @@ async def index_teams_messages(
|
|||
if existing_document:
|
||||
# Document exists - check if content has changed
|
||||
if existing_document.content_hash == content_hash:
|
||||
# Ensure status is ready (might have been stuck in processing/pending)
|
||||
if not DocumentStatus.is_state(
|
||||
existing_document.status, DocumentStatus.READY
|
||||
):
|
||||
|
|
@ -342,7 +424,7 @@ async def index_teams_messages(
|
|||
continue
|
||||
|
||||
# Queue existing document for update (will be set to processing in Phase 2)
|
||||
messages_to_process.append(
|
||||
batches_to_process.append(
|
||||
{
|
||||
"document": existing_document,
|
||||
"is_new": False,
|
||||
|
|
@ -352,14 +434,21 @@ async def index_teams_messages(
|
|||
"team_id": team_id,
|
||||
"channel_name": channel_name,
|
||||
"channel_id": channel_id,
|
||||
"message_id": message_id,
|
||||
"first_message_id": first_msg_id,
|
||||
"last_message_id": last_msg_id,
|
||||
"first_message_time": batch[0].get(
|
||||
"created_datetime", "Unknown"
|
||||
),
|
||||
"last_message_time": batch[-1].get(
|
||||
"created_datetime", "Unknown"
|
||||
),
|
||||
"message_count": len(batch),
|
||||
"start_date": start_date_str,
|
||||
"end_date": end_date_str,
|
||||
}
|
||||
)
|
||||
continue
|
||||
|
||||
# Document doesn't exist by unique_identifier_hash
|
||||
# Check if a document with the same content_hash exists (from another connector)
|
||||
with session.no_autoflush:
|
||||
duplicate_by_content = (
|
||||
|
|
@ -370,9 +459,10 @@ async def index_teams_messages(
|
|||
|
||||
if duplicate_by_content:
|
||||
logger.info(
|
||||
"Teams message %s in channel %s already indexed by another connector "
|
||||
"Teams batch (%s msgs) in %s/%s already indexed by another connector "
|
||||
"(existing document ID: %s, type: %s). Skipping.",
|
||||
message_id,
|
||||
len(batch),
|
||||
team_name,
|
||||
channel_name,
|
||||
duplicate_by_content.id,
|
||||
duplicate_by_content.document_type,
|
||||
|
|
@ -391,6 +481,9 @@ async def index_teams_messages(
|
|||
"team_id": team_id,
|
||||
"channel_name": channel_name,
|
||||
"channel_id": channel_id,
|
||||
"first_message_id": first_msg_id,
|
||||
"last_message_id": last_msg_id,
|
||||
"message_count": len(batch),
|
||||
"connector_id": connector_id,
|
||||
},
|
||||
content="Pending...", # Placeholder until processed
|
||||
|
|
@ -406,7 +499,7 @@ async def index_teams_messages(
|
|||
session.add(document)
|
||||
new_documents_created = True
|
||||
|
||||
messages_to_process.append(
|
||||
batches_to_process.append(
|
||||
{
|
||||
"document": document,
|
||||
"is_new": True,
|
||||
|
|
@ -416,12 +509,30 @@ async def index_teams_messages(
|
|||
"team_id": team_id,
|
||||
"channel_name": channel_name,
|
||||
"channel_id": channel_id,
|
||||
"message_id": message_id,
|
||||
"first_message_id": first_msg_id,
|
||||
"last_message_id": last_msg_id,
|
||||
"first_message_time": batch[0].get(
|
||||
"created_datetime", "Unknown"
|
||||
),
|
||||
"last_message_time": batch[-1].get(
|
||||
"created_datetime", "Unknown"
|
||||
),
|
||||
"message_count": len(batch),
|
||||
"start_date": start_date_str,
|
||||
"end_date": end_date_str,
|
||||
}
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Phase 1: Collected %s messages from %s/%s, "
|
||||
"grouped into %s batch(es)",
|
||||
len(formatted_messages),
|
||||
team_name,
|
||||
channel_name,
|
||||
(len(formatted_messages) + TEAMS_BATCH_SIZE - 1)
|
||||
// TEAMS_BATCH_SIZE,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(
|
||||
"Error processing channel %s in team %s: %s",
|
||||
|
|
@ -441,17 +552,20 @@ async def index_teams_messages(
|
|||
# Commit all pending documents - they all appear in UI now
|
||||
if new_documents_created:
|
||||
logger.info(
|
||||
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
|
||||
"Phase 1: Committing %s pending batch documents "
|
||||
"(%s total messages across all channels)",
|
||||
len([b for b in batches_to_process if b["is_new"]]),
|
||||
total_messages_collected,
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
# =======================================================================
|
||||
# PHASE 2: Process each document one by one
|
||||
# PHASE 2: Process each batch document one by one
|
||||
# Each document transitions: pending → processing → ready/failed
|
||||
# =======================================================================
|
||||
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
|
||||
logger.info("Phase 2: Processing %s batch documents", len(batches_to_process))
|
||||
|
||||
for item in messages_to_process:
|
||||
for item in batches_to_process:
|
||||
# Send heartbeat periodically
|
||||
if on_heartbeat_callback:
|
||||
current_time = time.time()
|
||||
|
|
@ -481,6 +595,11 @@ async def index_teams_messages(
|
|||
"team_id": item["team_id"],
|
||||
"channel_name": item["channel_name"],
|
||||
"channel_id": item["channel_id"],
|
||||
"first_message_id": item["first_message_id"],
|
||||
"last_message_id": item["last_message_id"],
|
||||
"first_message_time": item["first_message_time"],
|
||||
"last_message_time": item["last_message_time"],
|
||||
"message_count": item["message_count"],
|
||||
"start_date": item["start_date"],
|
||||
"end_date": item["end_date"],
|
||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
|
|
@ -495,20 +614,25 @@ async def index_teams_messages(
|
|||
# Batch commit every 10 documents (for ready status updates)
|
||||
if documents_indexed % 10 == 0:
|
||||
logger.info(
|
||||
"Committing batch: %s Teams messages processed so far",
|
||||
"Committing batch: %s Teams batch documents processed so far",
|
||||
documents_indexed,
|
||||
)
|
||||
await session.commit()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Teams message: {e!s}", exc_info=True)
|
||||
logger.error(
|
||||
"Error processing Teams batch document: %s",
|
||||
str(e),
|
||||
exc_info=True,
|
||||
)
|
||||
# Mark document as failed with reason (visible in UI)
|
||||
try:
|
||||
document.status = DocumentStatus.failed(str(e))
|
||||
document.updated_at = get_current_timestamp()
|
||||
except Exception as status_error:
|
||||
logger.error(
|
||||
f"Failed to update document status to failed: {status_error}"
|
||||
"Failed to update document status to failed: %s",
|
||||
str(status_error),
|
||||
)
|
||||
documents_failed += 1
|
||||
continue
|
||||
|
|
@ -518,7 +642,9 @@ async def index_teams_messages(
|
|||
|
||||
# Final commit for any remaining documents not yet committed in batches
|
||||
logger.info(
|
||||
"Final commit: Total %s Teams messages processed", documents_indexed
|
||||
"Final commit: Total %s Teams batch documents processed (from %s messages)",
|
||||
documents_indexed,
|
||||
total_messages_collected,
|
||||
)
|
||||
try:
|
||||
await session.commit()
|
||||
|
|
@ -530,8 +656,9 @@ async def index_teams_messages(
|
|||
or "uniqueviolationerror" in str(e).lower()
|
||||
):
|
||||
logger.warning(
|
||||
f"Duplicate content_hash detected during final commit. "
|
||||
f"Rolling back and continuing. Error: {e!s}"
|
||||
"Duplicate content_hash detected during final commit. "
|
||||
"Rolling back and continuing. Error: %s",
|
||||
str(e),
|
||||
)
|
||||
await session.rollback()
|
||||
else:
|
||||
|
|
@ -557,13 +684,16 @@ async def index_teams_messages(
|
|||
"documents_failed": documents_failed,
|
||||
"duplicate_content_count": duplicate_content_count,
|
||||
"skipped_channels_count": len(skipped_channels),
|
||||
"total_messages_collected": total_messages_collected,
|
||||
"batch_size": TEAMS_BATCH_SIZE,
|
||||
},
|
||||
)
|
||||
|
||||
logger.info(
|
||||
"Teams indexing completed: %s ready, %s skipped, %s failed "
|
||||
"(%s duplicate content)",
|
||||
"Teams indexing completed: %s batch docs ready (from %s messages), "
|
||||
"%s skipped, %s failed (%s duplicate content)",
|
||||
documents_indexed,
|
||||
total_messages_collected,
|
||||
documents_skipped,
|
||||
documents_failed,
|
||||
duplicate_content_count,
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue