From 9d0f5b4249f6ee15eee12575b93468ae17e82f3f Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Wed, 14 Jan 2026 04:01:20 +0530 Subject: [PATCH] fix: Ensure notification updates are reliable during error handling - Added session refresh for notifications to prevent stale data after rollbacks in multiple document processing tasks. - Wrapped notification update logic in try-except blocks to handle potential failures gracefully and log errors without crashing the process. - Improved error handling for notification updates in various document processing functions, enhancing overall robustness. --- .../routes/search_source_connectors_routes.py | 4 + .../app/tasks/celery_tasks/document_tasks.py | 76 +++++++++++++------ 2 files changed, 58 insertions(+), 22 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index f63349916..c111ede11 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1090,6 +1090,8 @@ async def _run_indexing_with_notifications( # Update notification on exception if notification: try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, @@ -1824,6 +1826,8 @@ async def run_google_drive_indexing( # Update notification on exception if notification: try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index d59ec95a2..59b062311 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -166,12 +166,21 @@ async def _process_extension_document( {"error_type": type(e).__name__}, ) - # Update notification on failure - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=str(e)[:100], - ) + # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently + try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:100], + ) + ) + except Exception as notif_error: + logger.error( + f"Failed to update notification on failure: {notif_error!s}" + ) logger.error(f"Error processing extension document: {e!s}") raise @@ -280,12 +289,21 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str): {"error_type": type(e).__name__}, ) - # Update notification on failure - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=str(e)[:100], - ) + # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently + try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:100], + ) + ) + except Exception as notif_error: + logger.error( + f"Failed to update notification on failure: {notif_error!s}" + ) logger.error(f"Error processing YouTube video: {e!s}") raise @@ -404,12 +422,21 @@ async def _process_file_upload( else: error_message = str(e)[:100] - # Update notification on failure - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=error_message, - ) + # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently + try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) + await ( + NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=error_message, + ) + ) + except Exception as notif_error: + logger.error( + f"Failed to update notification on failure: {notif_error!s}" + ) await task_logger.log_task_failure( log_entry, @@ -564,15 +591,20 @@ async def _process_circleback_meeting( {"error_type": type(e).__name__, "meeting_id": meeting_id}, ) - # Update notification on failure + # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently if notification: - await ( - NotificationService.document_processing.notify_processing_completed( + try: + # Refresh notification to ensure it's not stale after any rollback + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( session=session, notification=notification, error_message=str(e)[:100], ) - ) + except Exception as notif_error: + logger.error( + f"Failed to update notification on failure: {notif_error!s}" + ) logger.error(f"Error processing Circleback meeting: {e!s}") raise