diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index e3b988676..9a1937d6b 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -464,6 +464,22 @@ async def check_document_by_unique_identifier( return existing_doc_result.scalars().first() +async def check_document_by_content_hash( + session: AsyncSession, content_hash: str +) -> Document | None: + """Check if a document with the given content hash already exists. + + This is used to prevent duplicate content from being indexed, regardless + of which connector originally indexed it. + """ + from sqlalchemy.future import select + + existing_doc_result = await session.execute( + select(Document).where(Document.content_hash == content_hash) + ) + return existing_doc_result.scalars().first() + + async def update_connector_last_indexed( session: AsyncSession, connector, @@ -487,8 +503,11 @@ async def index_composio_google_drive( log_entry, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """Index Google Drive files via Composio with delta sync support. + + Returns: + Tuple of (documents_indexed, documents_skipped, error_message or None) Delta Sync Flow: 1. First sync: Full scan + get initial page token @@ -628,11 +647,11 @@ async def index_composio_google_drive( }, ) - return documents_indexed, error_message + return documents_indexed, documents_skipped, error_message except Exception as e: logger.error(f"Failed to index Google Drive via Composio: {e!s}", exc_info=True) - return 0, f"Failed to index Google Drive via Composio: {e!s}" + return 0, 0, f"Failed to index Google Drive via Composio: {e!s}" async def _index_composio_drive_delta_sync( @@ -1000,7 +1019,7 @@ async def _process_single_drive_file( if existing_document: if existing_document.content_hash == content_hash: - return 0, 1, processing_errors # Skipped + return 0, 1, processing_errors # Skipped - unchanged # Update existing document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1039,7 +1058,17 @@ async def _process_single_drive_file( existing_document.chunks = chunks existing_document.updated_at = get_current_timestamp() - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - updated + + # Check if content_hash already exists (from any connector) + # This prevents duplicate content and avoids IntegrityError on unique constraint + existing_by_content_hash = await check_document_by_content_hash(session, content_hash) + if existing_by_content_hash: + logger.info( + f"Skipping file {file_name} (file_id={file_id}): identical content " + f"already indexed as '{existing_by_content_hash.title}'" + ) + return 0, 1, processing_errors # Skipped - duplicate content # Create new document user_llm = await get_user_long_context_llm(session, user_id, search_space_id) @@ -1085,7 +1114,7 @@ async def _process_single_drive_file( ) session.add(document) - return 1, 0, processing_errors # Indexed + return 1, 0, processing_errors # Indexed - new async def _fetch_folder_files_recursively( diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 191c6f954..9815ad827 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1180,7 +1180,8 @@ async def _run_indexing_with_notifications( ) # Run the indexing function - documents_processed, error_or_warning = await indexing_function( + # Some indexers return (indexed, error), others return (indexed, skipped, error) + result = await indexing_function( session=session, connector_id=connector_id, search_space_id=search_space_id, @@ -1189,6 +1190,13 @@ async def _run_indexing_with_notifications( end_date=end_date, update_last_indexed=False, ) + + # Handle both 2-tuple and 3-tuple returns for backwards compatibility + if len(result) == 3: + documents_processed, documents_skipped, error_or_warning = result + else: + documents_processed, error_or_warning = result + documents_skipped = None # Update connector timestamp if function provided and indexing was successful if documents_processed > 0 and update_timestamp_func: @@ -1216,6 +1224,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1242,6 +1251,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=documents_processed, error_message=error_or_warning, # Show errors even if some documents were indexed + skipped_count=documents_skipped, ) await ( session.commit() @@ -1283,6 +1293,7 @@ async def _run_indexing_with_notifications( indexed_count=0, error_message=notification_message, # Pass as warning, not error is_warning=True, # Flag to indicate this is a warning, not an error + skipped_count=documents_skipped, ) await ( session.commit() @@ -1298,6 +1309,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=error_or_warning, + skipped_count=documents_skipped, ) await ( session.commit() @@ -1319,6 +1331,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=None, # No error - sync succeeded + skipped_count=documents_skipped, ) await ( session.commit() @@ -1336,6 +1349,7 @@ async def _run_indexing_with_notifications( notification=notification, indexed_count=0, error_message=str(e), + skipped_count=None, # Unknown on exception ) except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index 04f39d8ef..ab0fcbfd4 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -336,6 +336,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: int, error_message: str | None = None, is_warning: bool = False, + skipped_count: int | None = None, ) -> Notification: """ Update notification when connector indexing completes. @@ -346,6 +347,7 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): indexed_count: Total number of items indexed error_message: Error message if indexing failed, or warning message (optional) is_warning: If True, treat error_message as a warning (success case) rather than an error + skipped_count: Number of items skipped (e.g., duplicates) - optional Returns: Updated notification @@ -354,6 +356,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): "connector_name", "Connector" ) + # Build the skipped text if there are skipped items + skipped_text = "" + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + skipped_text = f" ({skipped_count} {skipped_item_text} skipped - already indexed)" + # If there's an error message but items were indexed, treat it as a warning (partial success) # If is_warning is True, treat it as success even with 0 items (e.g., duplicates found) # Otherwise, treat it as a failure @@ -362,12 +370,12 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): # Partial success with warnings (e.g., duplicate content from other connectors) title = f"Ready: {connector_name}" item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced. Note: {error_message}" + message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}. Note: {error_message}" status = "completed" elif is_warning: # Warning case (e.g., duplicates found) - treat as success title = f"Ready: {connector_name}" - message = f"Sync completed. {error_message}" + message = f"Sync completed{skipped_text}. {error_message}" status = "completed" else: # Complete failure @@ -377,14 +385,19 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): else: title = f"Ready: {connector_name}" if indexed_count == 0: - message = "Already up to date! No new items to sync." + if skipped_count and skipped_count > 0: + skipped_item_text = "item" if skipped_count == 1 else "items" + message = f"Already up to date! {skipped_count} {skipped_item_text} skipped (already indexed)." + else: + message = "Already up to date! No new items to sync." else: item_text = "item" if indexed_count == 1 else "items" - message = f"Now searchable! {indexed_count} {item_text} synced." + message = f"Now searchable! {indexed_count} {item_text} synced{skipped_text}." status = "completed" metadata_updates = { "indexed_count": indexed_count, + "skipped_count": skipped_count or 0, "sync_stage": "completed" if (not error_message or is_warning or indexed_count > 0) else "failed", diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index f97652114..ffc4a1f27 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -86,7 +86,7 @@ async def index_composio_connector( end_date: str | None = None, update_last_indexed: bool = True, max_items: int = 1000, -) -> tuple[int, str]: +) -> tuple[int, int, str | None]: """ Index content from a Composio connector. @@ -104,7 +104,7 @@ async def index_composio_connector( max_items: Maximum number of items to fetch Returns: - Tuple of (number_of_indexed_items, error_message or None) + Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None) """ task_logger = TaskLoggingService(session, search_space_id) @@ -132,14 +132,14 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "InvalidConnectorType"} ) - return 0, error_msg + return 0, 0, error_msg if not connector: error_msg = f"Composio connector with ID {connector_id} not found" await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ConnectorNotFound"} ) - return 0, error_msg + return 0, 0, error_msg # Get toolkit ID from config toolkit_id = connector.config.get("toolkit_id") @@ -150,7 +150,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "MissingToolkitId"} ) - return 0, error_msg + return 0, 0, error_msg # Check if toolkit is indexable if toolkit_id not in INDEXABLE_TOOLKITS: @@ -158,7 +158,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, error_msg, {"error_type": "ToolkitNotIndexable"} ) - return 0, error_msg + return 0, 0, error_msg # Get indexer function from registry try: @@ -167,7 +167,7 @@ async def index_composio_connector( await task_logger.log_task_failure( log_entry, str(e), {"error_type": "NoIndexerImplemented"} ) - return 0, str(e) + return 0, 0, str(e) # Build kwargs for the indexer function kwargs = { @@ -199,7 +199,7 @@ async def index_composio_connector( {"error_type": "SQLAlchemyError"}, ) logger.error(f"Database error: {db_error!s}", exc_info=True) - return 0, f"Database error: {db_error!s}" + return 0, 0, f"Database error: {db_error!s}" except Exception as e: await session.rollback() await task_logger.log_task_failure( @@ -209,4 +209,4 @@ async def index_composio_connector( {"error_type": type(e).__name__}, ) logger.error(f"Failed to index Composio connector: {e!s}", exc_info=True) - return 0, f"Failed to index Composio connector: {e!s}" + return 0, 0, f"Failed to index Composio connector: {e!s}" diff --git a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json index 9c8585a0f..b729c3f8b 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json +++ b/surfsense_web/components/assistant-ui/connector-popup/config/connector-status-config.json @@ -24,11 +24,6 @@ "enabled": true, "status": "warning", "statusMessage": "Some requests may be blocked if not using Firecrawl." - }, - "COMPOSIO_GOOGLE_DRIVE_CONNECTOR": { - "enabled": false, - "status": "disabled", - "statusMessage": "Not available yet." } }, "globalSettings": {