From c48ba36fa47ccffb10f68a76231ab017321c5dbe Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Fri, 23 Jan 2026 23:36:14 +0530 Subject: [PATCH] feat: improve indexing logic and duplicate handling in connectors - Enhanced Google Calendar and Composio connector indexing to track and log duplicate content, preventing re-indexing of already processed events. - Implemented robust error handling during final commits to manage integrity errors gracefully, ensuring successful indexing despite potential duplicates. - Updated notification service to differentiate between actual errors and warnings for duplicate content, improving user feedback. - Refactored date handling to ensure valid date ranges and adjusted end dates when necessary for better indexing accuracy. --- .../composio_google_calendar_connector.py | 59 +++++++++++++-- .../routes/search_source_connectors_routes.py | 72 +++++++++++++++---- .../app/services/notification_service.py | 28 ++++++-- .../google_calendar_indexer.py | 49 ++++++++++++- .../views/connector-edit-view.tsx | 14 ++-- .../hooks/use-connector-dialog.ts | 11 ++- 6 files changed, 198 insertions(+), 35 deletions(-) diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index ab8bde53c..3ac235848 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -18,7 +18,10 @@ from app.db import Document, DocumentType from app.services.composio_service import TOOLKIT_TO_DOCUMENT_TYPE from app.services.llm_service import get_user_long_context_llm from app.services.task_logging_service import TaskLoggingService -from app.tasks.connector_indexers.base import calculate_date_range +from app.tasks.connector_indexers.base import ( + calculate_date_range, + check_duplicate_document_by_hash, +) from app.utils.document_converters import ( create_document_chunks, generate_content_hash, @@ -256,6 +259,7 @@ async def index_composio_google_calendar( documents_indexed = 0 documents_skipped = 0 + duplicate_content_count = 0 # Track events skipped due to duplicate content_hash for event in events: try: @@ -349,7 +353,25 @@ async def index_composio_google_calendar( logger.info( f"Committing batch: {documents_indexed} Google Calendar events processed so far" ) - await session.commit() + await session.commit( ) + continue + + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from standard connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + # A document with the same content already exists (likely from standard connector) + logger.info( + f"Event {summary} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content." + ) + duplicate_content_count += 1 + documents_skipped += 1 continue # Create new document @@ -429,10 +451,28 @@ async def index_composio_google_calendar( logger.info( f"Final commit: Total {documents_indexed} Google Calendar events processed" ) - await session.commit() - logger.info( - "Successfully committed all Composio Google Calendar document changes to database" - ) + try: + await session.commit() + logger.info( + "Successfully committed all Composio Google Calendar document changes to database" + ) + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower(): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same event was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if duplicates were found + warning_message = None + if duplicate_content_count > 0: + warning_message = f"{duplicate_content_count} skipped (duplicate)" await task_logger.log_task_success( log_entry, @@ -440,10 +480,15 @@ async def index_composio_google_calendar( { "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "duplicate_content_count": duplicate_content_count, }, ) - return documents_indexed, None + logger.info( + f"Composio Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped " + f"({duplicate_content_count} due to duplicate content from other connectors)" + ) + return documents_indexed, warning_message except Exception as e: logger.error( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 928327d9a..3b98d7d7c 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -22,6 +22,8 @@ import logging from datetime import UTC, datetime, timedelta from typing import Any +import pytz +from dateutil.parser import isoparse from fastapi import APIRouter, Body, Depends, HTTPException, Query from pydantic import BaseModel, Field, ValidationError from sqlalchemy.exc import IntegrityError @@ -681,6 +683,22 @@ async def index_connector_content( ]: # Default to today if no end_date provided (users can manually select future dates) indexing_to = today_str if end_date is None else end_date + + # If start_date and end_date are the same, adjust end_date to be one day later + # to ensure valid date range (start_date must be strictly before end_date) + if indexing_from == indexing_to: + dt = isoparse(indexing_to) + if dt.tzinfo is None: + dt = dt.replace(tzinfo=pytz.UTC) + else: + dt = dt.astimezone(pytz.UTC) + # Add one day to end_date to make it strictly after start_date + dt_end = dt + timedelta(days=1) + indexing_to = dt_end.strftime("%Y-%m-%d") + logger.info( + f"Adjusted end_date from {end_date} to {indexing_to} " + f"to ensure valid date range (start_date must be strictly before end_date)" + ) else: # For non-calendar connectors, cap at today indexing_to = end_date if end_date else today_str @@ -1231,20 +1249,48 @@ async def _run_indexing_with_notifications( else: # No new documents processed - check if this is an error or just no changes if error_or_warning: - # Actual failure - logger.error(f"Indexing failed: {error_or_warning}") - if notification: - # Refresh notification to ensure it's not stale after indexing function commits - await session.refresh(notification) - await NotificationService.connector_indexing.notify_indexing_completed( - session=session, - notification=notification, - indexed_count=0, - error_message=error_or_warning, + # Check if this is a duplicate warning (success case) or an actual error + # Handle both normal and Composio calendar connectors + error_or_warning_lower = str(error_or_warning).lower() if error_or_warning else "" + is_duplicate_warning = "skipped (duplicate)" in error_or_warning_lower + + if is_duplicate_warning: + # Duplicate warnings are success cases - sync worked, just found duplicates + logger.info( + f"Indexing completed successfully: {error_or_warning}" ) - await ( - session.commit() - ) # Commit to ensure Electric SQL syncs the notification update + # Still update timestamp so ElectricSQL syncs and clears "Syncing" UI + if update_timestamp_func: + await update_timestamp_func(session, connector_id) + await session.commit() # Commit timestamp update + if notification: + # Refresh notification to ensure it's not stale after timestamp update commit + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=0, + error_message=error_or_warning, # Pass as warning, not error + is_warning=True, # Flag to indicate this is a warning, not an error + ) + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update + else: + # Actual failure + logger.error(f"Indexing failed: {error_or_warning}") + if notification: + # Refresh notification to ensure it's not stale after indexing function commits + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=0, + error_message=error_or_warning, + ) + await ( + session.commit() + ) # Commit to ensure Electric SQL syncs the notification update else: # Success - just no new documents to index (all skipped/unchanged) logger.info( diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 836daeb9e..9fcf807e7 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -335,6 +335,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): notification: Notification, indexed_count: int, error_message: str | None = None, + is_warning: bool = False, ) -> Notification: """ Update notification when connector indexing completes. @@ -343,7 +344,8 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): session: Database session notification: Notification to update indexed_count: Total number of items indexed - error_message: Error message if indexing failed (optional) + error_message: Error message if indexing failed, or warning message (optional) + is_warning: If True, treat error_message as a warning (success case) rather than an error Returns: Updated notification @@ -352,10 +354,26 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "connector_name", "Connector" ) + # If there's an error message but items were indexed, treat it as a warning (partial success) + # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) + # Otherwise, treat it as a failure if error_message: - title = f"Failed: {connector_name}" - message = f"Sync failed: {error_message}" - status = "failed" + if indexed_count > 0: + # Partial success with warnings (e.g., duplicate content from other connectors) + title = f"Ready: {connector_name}" + item_text = "item" if indexed_count == 1 else "items" + message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" + status = "completed" + elif is_warning: + # Warning case (e.g., duplicates found) - treat as success + title = f"Ready: {connector_name}" + message = f"Sync completed. {error_message}" + status = "completed" + else: + # Complete failure + title = f"Failed: {connector_name}" + message = f"Sync failed: {error_message}" + status = "failed" else: title = f"Ready: {connector_name}" if indexed_count == 0: @@ -367,7 +385,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): metadata_updates = { "indexed_count": indexed_count, - "sync_stage": "completed" if not error_message else "failed", + "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", "error_message": error_message, } diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 7787560fa..5bc805549 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -23,6 +23,7 @@ from app.utils.document_converters import ( from .base import ( check_document_by_unique_identifier, + check_duplicate_document_by_hash, get_connector_by_id, get_current_timestamp, logger, @@ -289,6 +290,7 @@ async def index_google_calendar_events( documents_indexed = 0 documents_skipped = 0 skipped_events = [] + duplicate_content_count = 0 # Track events skipped due to duplicate content_hash for event in events: try: @@ -409,6 +411,27 @@ async def index_google_calendar_events( ) continue + # Document doesn't exist by unique_identifier_hash + # Check if a document with the same content_hash exists (from another connector) + with session.no_autoflush: + duplicate_by_content = await check_duplicate_document_by_hash( + session, content_hash + ) + + if duplicate_by_content: + # A document with the same content already exists (likely from Composio connector) + logger.info( + f"Event {event_summary} already indexed by another connector " + f"(existing document ID: {duplicate_by_content.id}, " + f"type: {duplicate_by_content.document_type}). Skipping to avoid duplicate content." + ) + duplicate_content_count += 1 + documents_skipped += 1 + skipped_events.append( + f"{event_summary} (already indexed by another connector)" + ) + continue + # Document doesn't exist - create new one # Generate summary with metadata user_llm = await get_user_long_context_llm( @@ -501,7 +524,25 @@ async def index_google_calendar_events( logger.info( f"Final commit: Total {documents_indexed} Google Calendar events processed" ) - await session.commit() + try: + await session.commit() + except Exception as e: + # Handle any remaining integrity errors gracefully (race conditions, etc.) + if "duplicate key value violates unique constraint" in str(e).lower() or "uniqueviolationerror" in str(e).lower(): + logger.warning( + f"Duplicate content_hash detected during final commit. " + f"This may occur if the same event was indexed by multiple connectors. " + f"Rolling back and continuing. Error: {e!s}" + ) + await session.rollback() + # Don't fail the entire task - some documents may have been successfully indexed + else: + raise + + # Build warning message if duplicates were found + warning_message = None + if duplicate_content_count > 0: + warning_message = f"{duplicate_content_count} skipped (duplicate)" await task_logger.log_task_success( log_entry, @@ -510,14 +551,16 @@ async def index_google_calendar_events( "events_processed": total_processed, "documents_indexed": documents_indexed, "documents_skipped": documents_skipped, + "duplicate_content_count": duplicate_content_count, "skipped_events_count": len(skipped_events), }, ) logger.info( - f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped" + f"Google Calendar indexing completed: {documents_indexed} new events, {documents_skipped} skipped " + f"({duplicate_content_count} due to duplicate content from other connectors)" ) - return total_processed, None + return total_processed, warning_message except SQLAlchemyError as db_error: await session.rollback() diff --git a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx index d12264fbd..8f58db542 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx +++ b/surfsense_web/components/assistant-ui/connector-popup/connector-configs/views/connector-edit-view.tsx @@ -100,10 +100,14 @@ export const ConnectorEditView: FC = ({ // Reset local quick indexing state when indexing completes or fails useEffect(() => { - if (!isIndexing) { - setIsQuickIndexing(false); + if (!isIndexing && isQuickIndexing) { + // Small delay to ensure smooth transition + const timer = setTimeout(() => { + setIsQuickIndexing(false); + }, 100); + return () => clearTimeout(timer); } - }, [isIndexing]); + }, [isIndexing, isQuickIndexing]); const handleDisconnectClick = () => { setShowDisconnectConfirm(true); @@ -119,11 +123,11 @@ export const ConnectorEditView: FC = ({ }; const handleQuickIndex = useCallback(() => { - if (onQuickIndex) { + if (onQuickIndex && !isQuickIndexing && !isIndexing) { setIsQuickIndexing(true); onQuickIndex(); } - }, [onQuickIndex]); + }, [onQuickIndex, isQuickIndexing, isIndexing]); return (
diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts index 1bcbd4263..9a7f15b0c 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-connector-dialog.ts @@ -1409,7 +1409,12 @@ export const useConnectorDialog = () => { startDate?: Date, endDate?: Date ) => { - if (!searchSpaceId) return; + if (!searchSpaceId) { + if (stopIndexing) { + stopIndexing(connectorId); + } + return; + } // Track quick index clicked event if (connectorType) { @@ -1437,6 +1442,8 @@ export const useConnectorDialog = () => { queryClient.invalidateQueries({ queryKey: cacheKeys.logs.summary(Number(searchSpaceId)), }); + // Note: Don't call stopIndexing here - let useIndexingConnectors hook + // detect when last_indexed_at changes via Electric SQL } catch (error) { console.error("Error indexing connector content:", error); toast.error(error instanceof Error ? error.message : "Failed to start indexing"); @@ -1446,7 +1453,7 @@ export const useConnectorDialog = () => { } } }, - [searchSpaceId, indexConnector] + [searchSpaceId, indexConnector, queryClient] ); // Handle going back from edit view