From 24fd873ca7a89574102f6ba22e18cd0d79d80044 Mon Sep 17 00:00:00 2001 From: "DESKTOP-RTLN3BA\\$punk" Date: Wed, 26 Mar 2025 17:44:38 -0700 Subject: [PATCH] fix: Fixed Slack Reindexing --- .../routes/search_source_connectors_routes.py | 10 +- .../app/tasks/connectors_indexing_tasks.py | 115 +++++++++++++----- 2 files changed, 91 insertions(+), 34 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d0b63d932..869e17592 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -370,19 +370,19 @@ async def run_slack_indexing( """ try: # Index Slack messages without updating last_indexed_at (we'll do it separately) - documents_indexed, error_or_warning = await index_slack_messages( + documents_processed, error_or_warning = await index_slack_messages( session=session, connector_id=connector_id, search_space_id=search_space_id, update_last_indexed=False # Don't update timestamp in the indexing function ) - # Only update last_indexed_at if indexing was successful - if documents_indexed > 0 and (error_or_warning is None or "Indexed" in error_or_warning): + # Only update last_indexed_at if indexing was successful (either new docs or updated docs) + if documents_processed > 0: await update_connector_last_indexed(session, connector_id) - logger.info(f"Slack indexing completed successfully: {documents_indexed} documents indexed") + logger.info(f"Slack indexing completed successfully: {documents_processed} documents processed") else: - logger.error(f"Slack indexing failed or no documents indexed: {error_or_warning}") + logger.error(f"Slack indexing failed or no documents processed: {error_or_warning}") except Exception as e: logger.error(f"Error in background Slack indexing task: {str(e)}") diff --git a/surfsense_backend/app/tasks/connectors_indexing_tasks.py b/surfsense_backend/app/tasks/connectors_indexing_tasks.py index 2fb8e8dff..580a5c71f 100644 --- a/surfsense_backend/app/tasks/connectors_indexing_tasks.py +++ b/surfsense_backend/app/tasks/connectors_indexing_tasks.py @@ -59,16 +59,8 @@ async def index_slack_messages( end_date = datetime.now() # Use last_indexed_at as start date if available, otherwise use 365 days ago - if connector.last_indexed_at: - # Check if last_indexed_at is today - today = datetime.now().date() - if connector.last_indexed_at.date() == today: - # If last indexed today, go back 7 day to ensure we don't miss anything - start_date = end_date - timedelta(days=7) - else: - start_date = connector.last_indexed_at - else: - start_date = end_date - timedelta(days=365) + + start_date = end_date - timedelta(days=365) # Format dates for Slack API start_date_str = start_date.strftime("%Y-%m-%d") @@ -82,9 +74,29 @@ async def index_slack_messages( if not channels: return 0, "No Slack channels found" + + # Get existing documents for this search space and connector type to prevent duplicates + existing_docs_result = await session.execute( + select(Document) + .filter( + Document.search_space_id == search_space_id, + Document.document_type == DocumentType.SLACK_CONNECTOR + ) + ) + existing_docs = existing_docs_result.scalars().all() + + # Create a lookup dictionary of existing documents by channel_id + existing_docs_by_channel_id = {} + for doc in existing_docs: + if "channel_id" in doc.document_metadata: + existing_docs_by_channel_id[doc.document_metadata["channel_id"]] = doc + + logger.info(f"Found {len(existing_docs_by_channel_id)} existing Slack documents in database") # Track the number of documents indexed documents_indexed = 0 + documents_updated = 0 + documents_skipped = 0 skipped_channels = [] # Process each channel @@ -102,11 +114,13 @@ async def index_slack_messages( if not is_member: logger.warning(f"Bot is not a member of private channel {channel_name} ({channel_id}). Skipping.") skipped_channels.append(f"{channel_name} (private, bot not a member)") + documents_skipped += 1 continue except SlackApiError as e: if "not_in_channel" in str(e) or "channel_not_found" in str(e): logger.warning(f"Bot cannot access channel {channel_name} ({channel_id}). Skipping.") skipped_channels.append(f"{channel_name} (access error)") + documents_skipped += 1 continue else: # Re-raise if it's a different error @@ -123,10 +137,12 @@ async def index_slack_messages( if error: logger.warning(f"Error getting messages from channel {channel_name}: {error}") skipped_channels.append(f"{channel_name} (error: {error})") + documents_skipped += 1 continue # Skip this channel if there's an error if not messages: logger.info(f"No messages found in channel {channel_name} for the specified date range.") + documents_skipped += 1 continue # Skip if no messages # Format messages with user info @@ -141,6 +157,7 @@ async def index_slack_messages( if not formatted_messages: logger.info(f"No valid messages found in channel {channel_name} after filtering.") + documents_skipped += 1 continue # Skip if no valid messages after filtering # Convert messages to markdown format @@ -195,40 +212,77 @@ async def index_slack_messages( for chunk in config.chunker_instance.chunk(channel_content) ] - # Create and store document - document = Document( - search_space_id=search_space_id, - title=f"Slack - {channel_name}", - document_type=DocumentType.SLACK_CONNECTOR, - document_metadata={ + # Check if this channel already exists in our database + existing_document = existing_docs_by_channel_id.get(channel_id) + + if existing_document: + # Update existing document instead of creating a new one + logger.info(f"Updating existing document for channel {channel_name}") + + # Update document fields + existing_document.title = f"Slack - {channel_name}" + existing_document.document_metadata = { "channel_name": channel_name, "channel_id": channel_id, "start_date": start_date_str, "end_date": end_date_str, "message_count": len(formatted_messages), - "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") - }, - content=summary_content, - embedding=summary_embedding, - chunks=chunks - ) - - session.add(document) - documents_indexed += 1 - logger.info(f"Successfully indexed channel {channel_name} with {len(formatted_messages)} messages") + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), + "last_updated": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + } + existing_document.content = summary_content + existing_document.embedding = summary_embedding + + # Delete existing chunks and add new ones + await session.execute( + delete(Chunk) + .where(Chunk.document_id == existing_document.id) + ) + + # Assign new chunks to existing document + for chunk in chunks: + chunk.document_id = existing_document.id + session.add(chunk) + + documents_updated += 1 + else: + # Create and store new document + document = Document( + search_space_id=search_space_id, + title=f"Slack - {channel_name}", + document_type=DocumentType.SLACK_CONNECTOR, + document_metadata={ + "channel_name": channel_name, + "channel_id": channel_id, + "start_date": start_date_str, + "end_date": end_date_str, + "message_count": len(formatted_messages), + "indexed_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S") + }, + content=summary_content, + embedding=summary_embedding, + chunks=chunks + ) + + session.add(document) + documents_indexed += 1 + logger.info(f"Successfully indexed new channel {channel_name} with {len(formatted_messages)} messages") except SlackApiError as slack_error: logger.error(f"Slack API error for channel {channel_name}: {str(slack_error)}") skipped_channels.append(f"{channel_name} (Slack API error)") + documents_skipped += 1 continue # Skip this channel and continue with others except Exception as e: logger.error(f"Error processing channel {channel_name}: {str(e)}") skipped_channels.append(f"{channel_name} (processing error)") + documents_skipped += 1 continue # Skip this channel and continue with others # Update the last_indexed_at timestamp for the connector only if requested # and if we successfully indexed at least one channel - if update_last_indexed and documents_indexed > 0: + total_processed = documents_indexed + documents_updated + if update_last_indexed and total_processed > 0: connector.last_indexed_at = datetime.now() # Commit all changes @@ -237,9 +291,12 @@ async def index_slack_messages( # Prepare result message result_message = None if skipped_channels: - result_message = f"Indexed {documents_indexed} channels. Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" + result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated). Skipped {len(skipped_channels)} channels: {', '.join(skipped_channels)}" + else: + result_message = f"Processed {total_processed} channels ({documents_indexed} new, {documents_updated} updated)." - return documents_indexed, result_message + logger.info(f"Slack indexing completed: {documents_indexed} new channels, {documents_updated} updated, {documents_skipped} skipped") + return total_processed, result_message except SQLAlchemyError as db_error: await session.rollback()