diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index c14fe5951..60581ba3f 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1660,7 +1660,10 @@ async def run_google_drive_indexing( user_id: str, items_dict: dict, # Dictionary with 'folders' and 'files' lists ): - """Runs the Google Drive indexing task for folders and files and updates the timestamp.""" + """Runs the Google Drive indexing task for folders and files with notifications.""" + from uuid import UUID + + notification = None try: from app.tasks.connector_indexers.google_drive_indexer import ( index_google_drive_files, @@ -1672,6 +1675,27 @@ async def run_google_drive_indexing( total_indexed = 0 errors = [] + # Get connector info for notification + connector_result = await session.execute( + select(SearchSourceConnector).where(SearchSourceConnector.id == connector_id) + ) + connector = connector_result.scalar_one_or_none() + + if connector: + # Create notification when indexing starts + notification = await NotificationService.connector_indexing.notify_google_drive_indexing_started( + session=session, + user_id=UUID(user_id), + connector_id=connector_id, + connector_name=connector.name, + connector_type=connector.connector_type.value, + search_space_id=search_space_id, + folder_count=len(items.folders), + file_count=len(items.files), + folder_names=items.get_folder_names() if items.folders else None, + file_names=items.get_file_names() if items.files else None, + ) + # Index each folder for folder in items.folders: try: @@ -1718,9 +1742,12 @@ async def run_google_drive_indexing( exc_info=True, ) + # Prepare error message for notification + error_message = None if errors: + error_message = "; ".join(errors) logger.error( - f"Google Drive indexing completed with errors for connector {connector_id}: {'; '.join(errors)}" + f"Google Drive indexing completed with errors for connector {connector_id}: {error_message}" ) else: logger.info( @@ -1729,12 +1756,33 @@ async def run_google_drive_indexing( # Update the last indexed timestamp only on full success await _update_connector_timestamp_by_id(session, connector_id) await session.commit() # Commit timestamp update + + # Update notification on completion + if notification: + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=total_indexed, + error_message=error_message, + ) + except Exception as e: logger.error( f"Critical error in run_google_drive_indexing for connector {connector_id}: {e}", exc_info=True, ) - # Optionally update status in DB to indicate failure + + # Update notification on exception + if notification: + try: + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=0, + error_message=str(e), + ) + except Exception as notif_error: + logger.error(f"Failed to update notification: {notif_error!s}") # Add new helper functions for luma indexing diff --git a/surfsense_backend/app/services/notification_service.py b/surfsense_backend/app/services/notification_service.py index cba9a8117..e5387b98b 100644 --- a/surfsense_backend/app/services/notification_service.py +++ b/surfsense_backend/app/services/notification_service.py @@ -199,6 +199,24 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): date_range = f"_{start_date or 'none'}_{end_date or 'none'}" return f"connector_{connector_id}_{timestamp}{date_range}" + def _generate_google_drive_operation_id( + self, connector_id: int, folder_count: int, file_count: int + ) -> str: + """ + Generate a unique operation ID for a Google Drive indexing operation. + + Args: + connector_id: Connector ID + folder_count: Number of folders to index + file_count: Number of files to index + + Returns: + Unique operation ID string + """ + timestamp = datetime.now(UTC).strftime("%Y%m%d_%H%M%S") + items_info = f"_{folder_count}f_{file_count}files" + return f"drive_{connector_id}_{timestamp}{items_info}" + async def notify_indexing_started( self, session: AsyncSession, @@ -332,6 +350,75 @@ class ConnectorIndexingNotificationHandler(BaseNotificationHandler): metadata_updates=metadata_updates, ) + async def notify_google_drive_indexing_started( + self, + session: AsyncSession, + user_id: UUID, + connector_id: int, + connector_name: str, + connector_type: str, + search_space_id: int, + folder_count: int, + file_count: int, + folder_names: list[str] | None = None, + file_names: list[str] | None = None, + ) -> Notification: + """ + Create or update notification when Google Drive indexing starts. + + Args: + session: Database session + user_id: User ID + connector_id: Connector ID + connector_name: Connector name + connector_type: Connector type + search_space_id: Search space ID + folder_count: Number of folders to index + file_count: Number of files to index + folder_names: List of folder names (optional) + file_names: List of file names (optional) + + Returns: + Notification: The created or updated notification + """ + operation_id = self._generate_google_drive_operation_id( + connector_id, folder_count, file_count + ) + title = f"Indexing: {connector_name}" + + # Create descriptive message + items_desc = [] + if folder_count > 0: + items_desc.append(f"{folder_count} folder{'s' if folder_count != 1 else ''}") + if file_count > 0: + items_desc.append(f"{file_count} file{'s' if file_count != 1 else ''}") + + message = f'Indexing "{connector_name}" ({", ".join(items_desc)}) in progress...' + + metadata = { + "connector_id": connector_id, + "connector_name": connector_name, + "connector_type": connector_type, + "folder_count": folder_count, + "file_count": file_count, + "indexed_count": 0, + } + + if folder_names: + metadata["folder_names"] = folder_names + if file_names: + metadata["file_names"] = file_names + + return await self.find_or_create_notification( + session=session, + user_id=user_id, + operation_id=operation_id, + title=title, + message=message, + search_space_id=search_space_id, + initial_metadata=metadata, + ) + class NotificationService: """Service for creating and managing notifications that sync via Electric SQL."""