mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-19 18:45:15 +02:00
feat: implement batch indexing for Slack messages to enhance efficiency and conversational context
This commit is contained in:
parent
98870a9f9a
commit
7cede99d29
1 changed files with 200 additions and 62 deletions
|
|
@ -1,7 +1,10 @@
|
||||||
"""
|
"""
|
||||||
Slack connector indexer.
|
Slack connector indexer.
|
||||||
|
|
||||||
Implements 2-phase document status updates for real-time UI feedback:
|
Implements batch indexing: groups up to SLACK_BATCH_SIZE messages per channel
|
||||||
|
into a single document for efficient indexing and better conversational context.
|
||||||
|
|
||||||
|
Uses 2-phase document status updates for real-time UI feedback:
|
||||||
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||||
- Phase 2: Process each document: pending → processing → ready/failed
|
- Phase 2: Process each document: pending → processing → ready/failed
|
||||||
"""
|
"""
|
||||||
|
|
@ -42,6 +45,72 @@ HeartbeatCallbackType = Callable[[int], Awaitable[None]]
|
||||||
# Heartbeat interval in seconds - update notification every 30 seconds
|
# Heartbeat interval in seconds - update notification every 30 seconds
|
||||||
HEARTBEAT_INTERVAL_SECONDS = 30
|
HEARTBEAT_INTERVAL_SECONDS = 30
|
||||||
|
|
||||||
|
# Number of messages to combine into a single document for batch indexing.
|
||||||
|
# Grouping messages improves conversational context in embeddings/chunks and
|
||||||
|
# drastically reduces the number of documents, embedding calls, and DB overhead.
|
||||||
|
SLACK_BATCH_SIZE = 100
|
||||||
|
|
||||||
|
|
||||||
|
def _build_batch_document_string(
|
||||||
|
team_name: str,
|
||||||
|
team_id: str,
|
||||||
|
channel_name: str,
|
||||||
|
channel_id: str,
|
||||||
|
messages: list[dict],
|
||||||
|
) -> str:
|
||||||
|
"""
|
||||||
|
Combine multiple Slack messages into a single document string.
|
||||||
|
|
||||||
|
Each message is formatted with its timestamp and author, and all messages
|
||||||
|
are concatenated into a conversation-style document. The chunker will
|
||||||
|
later split this into overlapping windows of ~8-10 consecutive messages,
|
||||||
|
preserving conversational context in each chunk's embedding.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
team_name: Name of the Slack workspace
|
||||||
|
team_id: ID of the Slack workspace
|
||||||
|
channel_name: Name of the channel
|
||||||
|
channel_id: ID of the channel
|
||||||
|
messages: List of formatted message dicts with 'user_name', 'datetime', 'text'
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted document string with metadata and conversation content
|
||||||
|
"""
|
||||||
|
first_msg_time = messages[0].get("datetime", "Unknown")
|
||||||
|
last_msg_time = messages[-1].get("datetime", "Unknown")
|
||||||
|
|
||||||
|
metadata_lines = [
|
||||||
|
f"WORKSPACE_NAME: {team_name}",
|
||||||
|
f"WORKSPACE_ID: {team_id}",
|
||||||
|
f"CHANNEL_NAME: {channel_name}",
|
||||||
|
f"CHANNEL_ID: {channel_id}",
|
||||||
|
f"MESSAGE_COUNT: {len(messages)}",
|
||||||
|
f"FIRST_MESSAGE_TIME: {first_msg_time}",
|
||||||
|
f"LAST_MESSAGE_TIME: {last_msg_time}",
|
||||||
|
]
|
||||||
|
|
||||||
|
conversation_lines = []
|
||||||
|
for msg in messages:
|
||||||
|
author = msg.get("user_name", "Unknown User")
|
||||||
|
timestamp = msg.get("datetime", "Unknown Time")
|
||||||
|
content = msg.get("text", "")
|
||||||
|
conversation_lines.append(f"[{timestamp}] {author}: {content}")
|
||||||
|
|
||||||
|
metadata_sections = [
|
||||||
|
("METADATA", metadata_lines),
|
||||||
|
(
|
||||||
|
"CONTENT",
|
||||||
|
[
|
||||||
|
"FORMAT: markdown",
|
||||||
|
"TEXT_START",
|
||||||
|
"\n".join(conversation_lines),
|
||||||
|
"TEXT_END",
|
||||||
|
],
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
return build_document_metadata_markdown(metadata_sections)
|
||||||
|
|
||||||
|
|
||||||
async def index_slack_messages(
|
async def index_slack_messages(
|
||||||
session: AsyncSession,
|
session: AsyncSession,
|
||||||
|
|
@ -56,6 +125,16 @@ async def index_slack_messages(
|
||||||
"""
|
"""
|
||||||
Index Slack messages from all accessible channels.
|
Index Slack messages from all accessible channels.
|
||||||
|
|
||||||
|
Messages are grouped into batches of SLACK_BATCH_SIZE per channel,
|
||||||
|
so each document contains up to 100 consecutive messages with full
|
||||||
|
conversational context. This reduces document count, embedding calls,
|
||||||
|
and DB overhead by ~100x while improving search quality through
|
||||||
|
context-aware chunk embeddings.
|
||||||
|
|
||||||
|
Implements 2-phase document status updates for real-time UI feedback:
|
||||||
|
- Phase 1: Create all documents with 'pending' status (visible in UI immediately)
|
||||||
|
- Phase 2: Process each document: pending → processing → ready/failed
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
session: Database session
|
session: Database session
|
||||||
connector_id: ID of the Slack connector
|
connector_id: ID of the Slack connector
|
||||||
|
|
@ -109,6 +188,10 @@ async def index_slack_messages(
|
||||||
f"Connector with ID {connector_id} not found or is not a Slack connector",
|
f"Connector with ID {connector_id} not found or is not a Slack connector",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Extract workspace info from connector config
|
||||||
|
team_id = connector.config.get("team_id", "")
|
||||||
|
team_name = connector.config.get("team_name", "Unknown Workspace")
|
||||||
|
|
||||||
# Note: Token handling is now done automatically by SlackHistory
|
# Note: Token handling is now done automatically by SlackHistory
|
||||||
# with auto-refresh support. We just need to pass session and connector_id.
|
# with auto-refresh support. We just need to pass session and connector_id.
|
||||||
|
|
||||||
|
|
@ -182,6 +265,8 @@ async def index_slack_messages(
|
||||||
documents_indexed = 0
|
documents_indexed = 0
|
||||||
documents_skipped = 0
|
documents_skipped = 0
|
||||||
documents_failed = 0 # Track messages that failed processing
|
documents_failed = 0 # Track messages that failed processing
|
||||||
|
duplicate_content_count = 0
|
||||||
|
total_messages_collected = 0
|
||||||
skipped_channels = []
|
skipped_channels = []
|
||||||
|
|
||||||
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
# Heartbeat tracking - update notification periodically to prevent appearing stuck
|
||||||
|
|
@ -194,10 +279,12 @@ async def index_slack_messages(
|
||||||
)
|
)
|
||||||
|
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
# PHASE 1: Collect all messages from all channels, create pending documents
|
# PHASE 1: Collect messages, group into batches, and create pending documents
|
||||||
# This makes ALL documents visible in the UI immediately with pending status
|
# Messages are grouped into batches of SLACK_BATCH_SIZE per channel.
|
||||||
|
# Each batch becomes a single document with full conversational context.
|
||||||
|
# All documents are visible in the UI immediately with pending status.
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
messages_to_process = [] # List of dicts with document and message data
|
batches_to_process = [] # List of dicts with document and batch data
|
||||||
new_documents_created = False
|
new_documents_created = False
|
||||||
|
|
||||||
for channel_obj in channels:
|
for channel_obj in channels:
|
||||||
|
|
@ -264,40 +351,39 @@ async def index_slack_messages(
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue # Skip if no valid messages after filtering
|
continue # Skip if no valid messages after filtering
|
||||||
|
|
||||||
for msg in formatted_messages:
|
total_messages_collected += len(formatted_messages)
|
||||||
timestamp = msg.get("datetime", "Unknown Time")
|
|
||||||
msg_ts = msg.get("ts", timestamp) # Get original Slack timestamp
|
|
||||||
msg_user_name = msg.get("user_name", "Unknown User")
|
|
||||||
msg_user_email = msg.get("user_email", "Unknown Email")
|
|
||||||
msg_text = msg.get("text", "")
|
|
||||||
|
|
||||||
# Format document metadata
|
# =======================================================
|
||||||
metadata_sections = [
|
# Group messages into batches of SLACK_BATCH_SIZE
|
||||||
(
|
# Each batch becomes a single document with conversation context
|
||||||
"METADATA",
|
# =======================================================
|
||||||
[
|
for batch_start in range(
|
||||||
f"CHANNEL_NAME: {channel_name}",
|
0, len(formatted_messages), SLACK_BATCH_SIZE
|
||||||
f"CHANNEL_ID: {channel_id}",
|
):
|
||||||
f"MESSAGE_TIMESTAMP: {timestamp}",
|
batch = formatted_messages[
|
||||||
f"MESSAGE_USER_NAME: {msg_user_name}",
|
batch_start : batch_start + SLACK_BATCH_SIZE
|
||||||
f"MESSAGE_USER_EMAIL: {msg_user_email}",
|
|
||||||
],
|
|
||||||
),
|
|
||||||
(
|
|
||||||
"CONTENT",
|
|
||||||
["FORMAT: markdown", "TEXT_START", msg_text, "TEXT_END"],
|
|
||||||
),
|
|
||||||
]
|
]
|
||||||
|
|
||||||
# Build the document string
|
# Build combined document string from all messages in this batch
|
||||||
combined_document_string = build_document_metadata_markdown(
|
combined_document_string = _build_batch_document_string(
|
||||||
metadata_sections
|
team_name=team_name,
|
||||||
|
team_id=team_id,
|
||||||
|
channel_name=channel_name,
|
||||||
|
channel_id=channel_id,
|
||||||
|
messages=batch,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Generate unique identifier hash for this Slack message
|
# Generate unique identifier for this batch using
|
||||||
unique_identifier = f"{channel_id}_{msg_ts}"
|
# channel_id + first message ts + last message ts
|
||||||
|
first_msg_ts = batch[0].get("timestamp", "")
|
||||||
|
last_msg_ts = batch[-1].get("timestamp", "")
|
||||||
|
unique_identifier = (
|
||||||
|
f"{channel_id}_{first_msg_ts}_{last_msg_ts}"
|
||||||
|
)
|
||||||
unique_identifier_hash = generate_unique_identifier_hash(
|
unique_identifier_hash = generate_unique_identifier_hash(
|
||||||
DocumentType.SLACK_CONNECTOR, unique_identifier, search_space_id
|
DocumentType.SLACK_CONNECTOR,
|
||||||
|
unique_identifier,
|
||||||
|
search_space_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Generate content hash
|
# Generate content hash
|
||||||
|
|
@ -306,8 +392,10 @@ async def index_slack_messages(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if document with this unique identifier already exists
|
# Check if document with this unique identifier already exists
|
||||||
existing_document = await check_document_by_unique_identifier(
|
existing_document = (
|
||||||
session, unique_identifier_hash
|
await check_document_by_unique_identifier(
|
||||||
|
session, unique_identifier_hash
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if existing_document:
|
if existing_document:
|
||||||
|
|
@ -317,26 +405,34 @@ async def index_slack_messages(
|
||||||
if not DocumentStatus.is_state(
|
if not DocumentStatus.is_state(
|
||||||
existing_document.status, DocumentStatus.READY
|
existing_document.status, DocumentStatus.READY
|
||||||
):
|
):
|
||||||
existing_document.status = DocumentStatus.ready()
|
existing_document.status = (
|
||||||
logger.info(
|
DocumentStatus.ready()
|
||||||
f"Document for Slack message {msg_ts} in channel {channel_name} unchanged. Skipping."
|
)
|
||||||
)
|
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Queue existing document for update (will be set to processing in Phase 2)
|
# Queue existing document for update (will be set to processing in Phase 2)
|
||||||
messages_to_process.append(
|
batches_to_process.append(
|
||||||
{
|
{
|
||||||
"document": existing_document,
|
"document": existing_document,
|
||||||
"is_new": False,
|
"is_new": False,
|
||||||
"combined_document_string": combined_document_string,
|
"combined_document_string": combined_document_string,
|
||||||
"content_hash": content_hash,
|
"content_hash": content_hash,
|
||||||
|
"team_name": team_name,
|
||||||
|
"team_id": team_id,
|
||||||
"channel_name": channel_name,
|
"channel_name": channel_name,
|
||||||
"channel_id": channel_id,
|
"channel_id": channel_id,
|
||||||
"msg_ts": msg_ts,
|
"first_message_ts": first_msg_ts,
|
||||||
|
"last_message_ts": last_msg_ts,
|
||||||
|
"first_message_time": batch[0].get(
|
||||||
|
"datetime", "Unknown"
|
||||||
|
),
|
||||||
|
"last_message_time": batch[-1].get(
|
||||||
|
"datetime", "Unknown"
|
||||||
|
),
|
||||||
|
"message_count": len(batch),
|
||||||
"start_date": start_date_str,
|
"start_date": start_date_str,
|
||||||
"end_date": end_date_str,
|
"end_date": end_date_str,
|
||||||
"message_count": len(formatted_messages),
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
|
@ -344,28 +440,35 @@ async def index_slack_messages(
|
||||||
# Document doesn't exist by unique_identifier_hash
|
# Document doesn't exist by unique_identifier_hash
|
||||||
# Check if a document with the same content_hash exists (from another connector)
|
# Check if a document with the same content_hash exists (from another connector)
|
||||||
with session.no_autoflush:
|
with session.no_autoflush:
|
||||||
duplicate_by_content = await check_duplicate_document_by_hash(
|
duplicate_by_content = (
|
||||||
session, content_hash
|
await check_duplicate_document_by_hash(
|
||||||
|
session, content_hash
|
||||||
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
if duplicate_by_content:
|
if duplicate_by_content:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Slack message {msg_ts} in channel {channel_name} already indexed by another connector "
|
f"Slack batch ({len(batch)} msgs) in {team_name}#{channel_name} already indexed by another connector "
|
||||||
f"(existing document ID: {duplicate_by_content.id}, "
|
f"(existing document ID: {duplicate_by_content.id}, "
|
||||||
f"type: {duplicate_by_content.document_type}). Skipping."
|
f"type: {duplicate_by_content.document_type}). Skipping."
|
||||||
)
|
)
|
||||||
|
duplicate_content_count += 1
|
||||||
documents_skipped += 1
|
documents_skipped += 1
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Create new document with PENDING status (visible in UI immediately)
|
# Create new document with PENDING status (visible in UI immediately)
|
||||||
document = Document(
|
document = Document(
|
||||||
search_space_id=search_space_id,
|
search_space_id=search_space_id,
|
||||||
title=channel_name,
|
title=f"{team_name}#{channel_name}",
|
||||||
document_type=DocumentType.SLACK_CONNECTOR,
|
document_type=DocumentType.SLACK_CONNECTOR,
|
||||||
document_metadata={
|
document_metadata={
|
||||||
|
"team_name": team_name,
|
||||||
|
"team_id": team_id,
|
||||||
"channel_name": channel_name,
|
"channel_name": channel_name,
|
||||||
"channel_id": channel_id,
|
"channel_id": channel_id,
|
||||||
"msg_ts": msg_ts,
|
"first_message_ts": first_msg_ts,
|
||||||
|
"last_message_ts": last_msg_ts,
|
||||||
|
"message_count": len(batch),
|
||||||
"connector_id": connector_id,
|
"connector_id": connector_id,
|
||||||
},
|
},
|
||||||
content="Pending...", # Placeholder until processed
|
content="Pending...", # Placeholder until processed
|
||||||
|
|
@ -381,23 +484,33 @@ async def index_slack_messages(
|
||||||
session.add(document)
|
session.add(document)
|
||||||
new_documents_created = True
|
new_documents_created = True
|
||||||
|
|
||||||
messages_to_process.append(
|
batches_to_process.append(
|
||||||
{
|
{
|
||||||
"document": document,
|
"document": document,
|
||||||
"is_new": True,
|
"is_new": True,
|
||||||
"combined_document_string": combined_document_string,
|
"combined_document_string": combined_document_string,
|
||||||
"content_hash": content_hash,
|
"content_hash": content_hash,
|
||||||
|
"team_name": team_name,
|
||||||
|
"team_id": team_id,
|
||||||
"channel_name": channel_name,
|
"channel_name": channel_name,
|
||||||
"channel_id": channel_id,
|
"channel_id": channel_id,
|
||||||
"msg_ts": msg_ts,
|
"first_message_ts": first_msg_ts,
|
||||||
|
"last_message_ts": last_msg_ts,
|
||||||
|
"first_message_time": batch[0].get(
|
||||||
|
"datetime", "Unknown"
|
||||||
|
),
|
||||||
|
"last_message_time": batch[-1].get(
|
||||||
|
"datetime", "Unknown"
|
||||||
|
),
|
||||||
|
"message_count": len(batch),
|
||||||
"start_date": start_date_str,
|
"start_date": start_date_str,
|
||||||
"end_date": end_date_str,
|
"end_date": end_date_str,
|
||||||
"message_count": len(formatted_messages),
|
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}"
|
f"Phase 1: Collected {len(formatted_messages)} messages from channel {channel_name}, "
|
||||||
|
f"grouped into {(len(formatted_messages) + SLACK_BATCH_SIZE - 1) // SLACK_BATCH_SIZE} batch(es)"
|
||||||
)
|
)
|
||||||
|
|
||||||
except SlackApiError as slack_error:
|
except SlackApiError as slack_error:
|
||||||
|
|
@ -416,17 +529,20 @@ async def index_slack_messages(
|
||||||
# Commit all pending documents - they all appear in UI now
|
# Commit all pending documents - they all appear in UI now
|
||||||
if new_documents_created:
|
if new_documents_created:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Phase 1: Committing {len([m for m in messages_to_process if m['is_new']])} pending documents"
|
f"Phase 1: Committing {len([b for b in batches_to_process if b['is_new']])} pending batch documents "
|
||||||
|
f"({total_messages_collected} total messages across all channels)"
|
||||||
)
|
)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
# PHASE 2: Process each document one by one
|
# PHASE 2: Process each batch document one by one
|
||||||
# Each document transitions: pending → processing → ready/failed
|
# Each document transitions: pending → processing → ready/failed
|
||||||
# =======================================================================
|
# =======================================================================
|
||||||
logger.info(f"Phase 2: Processing {len(messages_to_process)} documents")
|
logger.info(
|
||||||
|
f"Phase 2: Processing {len(batches_to_process)} batch documents"
|
||||||
|
)
|
||||||
|
|
||||||
for item in messages_to_process:
|
for item in batches_to_process:
|
||||||
# Send heartbeat periodically
|
# Send heartbeat periodically
|
||||||
if on_heartbeat_callback:
|
if on_heartbeat_callback:
|
||||||
current_time = time.time()
|
current_time = time.time()
|
||||||
|
|
@ -447,16 +563,22 @@ async def index_slack_messages(
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update document to READY with actual content
|
# Update document to READY with actual content
|
||||||
document.title = item["channel_name"]
|
document.title = f"{item['team_name']}#{item['channel_name']}"
|
||||||
document.content = item["combined_document_string"]
|
document.content = item["combined_document_string"]
|
||||||
document.content_hash = item["content_hash"]
|
document.content_hash = item["content_hash"]
|
||||||
document.embedding = doc_embedding
|
document.embedding = doc_embedding
|
||||||
document.document_metadata = {
|
document.document_metadata = {
|
||||||
|
"team_name": item["team_name"],
|
||||||
|
"team_id": item["team_id"],
|
||||||
"channel_name": item["channel_name"],
|
"channel_name": item["channel_name"],
|
||||||
"channel_id": item["channel_id"],
|
"channel_id": item["channel_id"],
|
||||||
|
"first_message_ts": item["first_message_ts"],
|
||||||
|
"last_message_ts": item["last_message_ts"],
|
||||||
|
"first_message_time": item["first_message_time"],
|
||||||
|
"last_message_time": item["last_message_time"],
|
||||||
|
"message_count": item["message_count"],
|
||||||
"start_date": item["start_date"],
|
"start_date": item["start_date"],
|
||||||
"end_date": item["end_date"],
|
"end_date": item["end_date"],
|
||||||
"message_count": item["message_count"],
|
|
||||||
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
"indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||||
"connector_id": connector_id,
|
"connector_id": connector_id,
|
||||||
}
|
}
|
||||||
|
|
@ -469,13 +591,13 @@ async def index_slack_messages(
|
||||||
# Batch commit every 10 documents (for ready status updates)
|
# Batch commit every 10 documents (for ready status updates)
|
||||||
if documents_indexed % 10 == 0:
|
if documents_indexed % 10 == 0:
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Committing batch: {documents_indexed} Slack messages processed so far"
|
f"Committing batch: {documents_indexed} batch documents processed so far"
|
||||||
)
|
)
|
||||||
await session.commit()
|
await session.commit()
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(
|
logger.error(
|
||||||
f"Error processing Slack message {item.get('msg_ts', 'Unknown')}: {e!s}",
|
f"Error processing Slack batch document: {e!s}",
|
||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
# Mark document as failed with reason (visible in UI)
|
# Mark document as failed with reason (visible in UI)
|
||||||
|
|
@ -493,10 +615,15 @@ async def index_slack_messages(
|
||||||
await update_connector_last_indexed(session, connector, update_last_indexed)
|
await update_connector_last_indexed(session, connector, update_last_indexed)
|
||||||
|
|
||||||
# Final commit for any remaining documents not yet committed in batches
|
# Final commit for any remaining documents not yet committed in batches
|
||||||
logger.info(f"Final commit: Total {documents_indexed} Slack messages processed")
|
logger.info(
|
||||||
|
f"Final commit: Total {documents_indexed} batch documents processed "
|
||||||
|
f"(from {total_messages_collected} messages)"
|
||||||
|
)
|
||||||
try:
|
try:
|
||||||
await session.commit()
|
await session.commit()
|
||||||
logger.info("Successfully committed all Slack document changes to database")
|
logger.info(
|
||||||
|
"Successfully committed all Slack document changes to database"
|
||||||
|
)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
# Handle any remaining integrity errors gracefully (race conditions, etc.)
|
||||||
if (
|
if (
|
||||||
|
|
@ -514,8 +641,12 @@ async def index_slack_messages(
|
||||||
|
|
||||||
# Build warning message if there were issues
|
# Build warning message if there were issues
|
||||||
warning_parts = []
|
warning_parts = []
|
||||||
|
if duplicate_content_count > 0:
|
||||||
|
warning_parts.append(f"{duplicate_content_count} duplicate")
|
||||||
if documents_failed > 0:
|
if documents_failed > 0:
|
||||||
warning_parts.append(f"{documents_failed} failed")
|
warning_parts.append(f"{documents_failed} failed")
|
||||||
|
if skipped_channels:
|
||||||
|
warning_parts.append(f"{len(skipped_channels)} channels skipped")
|
||||||
warning_message = ", ".join(warning_parts) if warning_parts else None
|
warning_message = ", ".join(warning_parts) if warning_parts else None
|
||||||
|
|
||||||
# Log success
|
# Log success
|
||||||
|
|
@ -527,13 +658,20 @@ async def index_slack_messages(
|
||||||
"documents_indexed": documents_indexed,
|
"documents_indexed": documents_indexed,
|
||||||
"documents_skipped": documents_skipped,
|
"documents_skipped": documents_skipped,
|
||||||
"documents_failed": documents_failed,
|
"documents_failed": documents_failed,
|
||||||
|
"duplicate_content_count": duplicate_content_count,
|
||||||
"skipped_channels_count": len(skipped_channels),
|
"skipped_channels_count": len(skipped_channels),
|
||||||
|
"total_messages_collected": total_messages_collected,
|
||||||
|
"batch_size": SLACK_BATCH_SIZE,
|
||||||
|
"team_id": team_id,
|
||||||
|
"team_name": team_name,
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
logger.info(
|
logger.info(
|
||||||
f"Slack indexing completed: {documents_indexed} ready, "
|
f"Slack indexing completed for workspace {team_name}: "
|
||||||
f"{documents_skipped} skipped, {documents_failed} failed"
|
f"{documents_indexed} batch docs ready (from {total_messages_collected} messages), "
|
||||||
|
f"{documents_skipped} skipped, {documents_failed} failed "
|
||||||
|
f"({duplicate_content_count} duplicate content)"
|
||||||
)
|
)
|
||||||
return documents_indexed, warning_message
|
return documents_indexed, warning_message
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue