From 6d1879ffcbf04c48fba6e64b304239292c21fe31 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Wed, 17 Jun 2026 15:06:05 +0200 Subject: [PATCH] continue indexing when notification creation fails --- .../app/tasks/celery_tasks/document_tasks.py | 182 ++++++++++-------- 1 file changed, 99 insertions(+), 83 deletions(-) diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index 41e029a60..4d71d6c9a 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -602,23 +602,29 @@ async def _process_file_upload( # Create notification for document processing logger.info(f"[_process_file_upload] Creating notification for: {filename}") - notification = ( - await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="FILE", - document_name=filename, - search_space_id=search_space_id, - file_size=file_size, + notification = None + heartbeat_task = None + try: + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="FILE", + document_name=filename, + search_space_id=search_space_id, + file_size=file_size, + ) + ) + logger.info( + f"[_process_file_upload] Notification created with ID: {notification.id}" + ) + _start_heartbeat(notification.id) + heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id)) + except Exception: + logger.warning( + f"[_process_file_upload] Failed to create notification for: {filename}", + exc_info=True, ) - ) - logger.info( - f"[_process_file_upload] Notification created with ID: {notification.id if notification else 'None'}" - ) - - # Start Redis heartbeat for stale task detection - _start_heartbeat(notification.id) - heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id)) log_entry = await task_logger.log_task_start( task_name="process_file_upload", @@ -646,23 +652,21 @@ async def _process_file_upload( # Update notification on success if result: - await ( - NotificationService.document_processing.notify_processing_completed( + if notification: + await NotificationService.document_processing.notify_processing_completed( session=session, notification=notification, document_id=result.id, chunks_count=None, ) - ) else: # Duplicate detected - await ( - NotificationService.document_processing.notify_processing_completed( + if notification: + await NotificationService.document_processing.notify_processing_completed( session=session, notification=notification, error_message="Document already exists (duplicate)", ) - ) except Exception as e: # Import here to avoid circular dependencies @@ -691,13 +695,13 @@ async def _process_file_upload( error_message = str(credit_error) # Create a dedicated insufficient credits notification try: - # First, mark the processing notification as failed - await session.refresh(notification) - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message="Insufficient credits", - ) + if notification: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Insufficient credits", + ) # Then create a separate insufficient_credits notification for better UX await NotificationService.insufficient_credits.notify_insufficient_credits( @@ -717,12 +721,13 @@ async def _process_file_upload( # HTTPException with page limit message but no detailed cause error_message = str(e.detail) try: - await session.refresh(notification) - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=error_message, - ) + if notification: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=error_message, + ) except Exception as notif_error: logger.error( f"Failed to update notification on failure: {notif_error!s}" @@ -731,13 +736,13 @@ async def _process_file_upload( error_message = str(e)[:100] # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently try: - # Refresh notification to ensure it's not stale after any rollback - await session.refresh(notification) - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=error_message, - ) + if notification: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=error_message, + ) except Exception as notif_error: logger.error( f"Failed to update notification on failure: {notif_error!s}" @@ -753,8 +758,10 @@ async def _process_file_upload( raise finally: # Stop heartbeat — key deleted on success, expires on crash - heartbeat_task.cancel() - _stop_heartbeat(notification.id) + if heartbeat_task: + heartbeat_task.cancel() + if notification: + _stop_heartbeat(notification.id) @celery_app.task(name="process_file_upload_with_document", bind=True) @@ -894,29 +901,36 @@ async def _process_file_with_document( logger.info( f"[_process_file_with_document] Creating notification for: {filename}" ) - notification = ( - await NotificationService.document_processing.notify_processing_started( - session=session, - user_id=UUID(user_id), - document_type="FILE", - document_name=filename, - search_space_id=search_space_id, - file_size=file_size, + notification = None + heartbeat_task = None + try: + notification = ( + await NotificationService.document_processing.notify_processing_started( + session=session, + user_id=UUID(user_id), + document_type="FILE", + document_name=filename, + search_space_id=search_space_id, + file_size=file_size, + ) ) - ) - # Store document_id in notification metadata so cleanup task can find the document - if notification and notification.notification_metadata is not None: - notification.notification_metadata["document_id"] = document_id - from sqlalchemy.orm.attributes import flag_modified + # Store document_id in notification metadata so cleanup task can find the document + if notification.notification_metadata is not None: + notification.notification_metadata["document_id"] = document_id + from sqlalchemy.orm.attributes import flag_modified - flag_modified(notification, "notification_metadata") - await session.commit() - await session.refresh(notification) + flag_modified(notification, "notification_metadata") + await session.commit() + await session.refresh(notification) - # Start Redis heartbeat for stale task detection - _start_heartbeat(notification.id) - heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id)) + _start_heartbeat(notification.id) + heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id)) + except Exception: + logger.warning( + f"[_process_file_with_document] Failed to create notification for: {filename}", + exc_info=True, + ) log_entry = await task_logger.log_task_start( task_name="process_file_upload_with_document", @@ -956,14 +970,13 @@ async def _process_file_with_document( # Update notification on success if result: - await ( - NotificationService.document_processing.notify_processing_completed( + if notification: + await NotificationService.document_processing.notify_processing_completed( session=session, notification=notification, document_id=result.id, chunks_count=None, ) - ) logger.info( f"[_process_file_with_document] Successfully processed document {document_id}" ) @@ -972,13 +985,12 @@ async def _process_file_with_document( document.status = DocumentStatus.failed("Duplicate content detected") document.updated_at = get_current_timestamp() await session.commit() - await ( - NotificationService.document_processing.notify_processing_completed( + if notification: + await NotificationService.document_processing.notify_processing_completed( session=session, notification=notification, error_message="Document already exists (duplicate)", ) - ) except Exception as e: # Import here to avoid circular dependencies @@ -1009,12 +1021,13 @@ async def _process_file_with_document( # Handle insufficient-credit errors with dedicated notification if credit_error is not None: try: - await session.refresh(notification) - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message="Insufficient credits", - ) + if notification: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message="Insufficient credits", + ) await NotificationService.insufficient_credits.notify_insufficient_credits( session=session, user_id=UUID(user_id), @@ -1031,12 +1044,13 @@ async def _process_file_with_document( else: # Update notification on failure try: - await session.refresh(notification) - await NotificationService.document_processing.notify_processing_completed( - session=session, - notification=notification, - error_message=str(e)[:100], - ) + if notification: + await session.refresh(notification) + await NotificationService.document_processing.notify_processing_completed( + session=session, + notification=notification, + error_message=str(e)[:100], + ) except Exception as notif_error: logger.error( f"Failed to update notification on failure: {notif_error!s}" @@ -1053,8 +1067,10 @@ async def _process_file_with_document( finally: # Stop heartbeat — key deleted on success, expires on crash - heartbeat_task.cancel() - _stop_heartbeat(notification.id) + if heartbeat_task: + heartbeat_task.cancel() + if notification: + _stop_heartbeat(notification.id) # Clean up temp file if os.path.exists(temp_path):