fix: Ensure notification updates are reliable during error handling

- Added session refresh for notifications to prevent stale data after rollbacks in multiple document processing tasks.
- Wrapped notification update logic in try-except blocks to handle potential failures gracefully and log errors without crashing the process.
- Improved error handling for notification updates in various document processing functions, enhancing overall robustness.
This commit is contained in:
Anish Sarkar 2026-01-14 04:01:20 +05:30
parent 69f46ff3f4
commit 9d0f5b4249
2 changed files with 58 additions and 22 deletions

View file

@ -1090,6 +1090,8 @@ async def _run_indexing_with_notifications(
# Update notification on exception # Update notification on exception
if notification: if notification:
try: try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed( await NotificationService.connector_indexing.notify_indexing_completed(
session=session, session=session,
notification=notification, notification=notification,
@ -1824,6 +1826,8 @@ async def run_google_drive_indexing(
# Update notification on exception # Update notification on exception
if notification: if notification:
try: try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.connector_indexing.notify_indexing_completed( await NotificationService.connector_indexing.notify_indexing_completed(
session=session, session=session,
notification=notification, notification=notification,

View file

@ -166,12 +166,21 @@ async def _process_extension_document(
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
# Update notification on failure # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
await NotificationService.document_processing.notify_processing_completed( try:
session=session, # Refresh notification to ensure it's not stale after any rollback
notification=notification, await session.refresh(notification)
error_message=str(e)[:100], await (
) NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing extension document: {e!s}") logger.error(f"Error processing extension document: {e!s}")
raise raise
@ -280,12 +289,21 @@ async def _process_youtube_video(url: str, search_space_id: int, user_id: str):
{"error_type": type(e).__name__}, {"error_type": type(e).__name__},
) )
# Update notification on failure # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
await NotificationService.document_processing.notify_processing_completed( try:
session=session, # Refresh notification to ensure it's not stale after any rollback
notification=notification, await session.refresh(notification)
error_message=str(e)[:100], await (
) NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing YouTube video: {e!s}") logger.error(f"Error processing YouTube video: {e!s}")
raise raise
@ -404,12 +422,21 @@ async def _process_file_upload(
else: else:
error_message = str(e)[:100] error_message = str(e)[:100]
# Update notification on failure # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
await NotificationService.document_processing.notify_processing_completed( try:
session=session, # Refresh notification to ensure it's not stale after any rollback
notification=notification, await session.refresh(notification)
error_message=error_message, await (
) NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
await task_logger.log_task_failure( await task_logger.log_task_failure(
log_entry, log_entry,
@ -564,15 +591,20 @@ async def _process_circleback_meeting(
{"error_type": type(e).__name__, "meeting_id": meeting_id}, {"error_type": type(e).__name__, "meeting_id": meeting_id},
) )
# Update notification on failure # Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
if notification: if notification:
await ( try:
NotificationService.document_processing.notify_processing_completed( # Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session, session=session,
notification=notification, notification=notification,
error_message=str(e)[:100], error_message=str(e)[:100],
) )
) except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
)
logger.error(f"Error processing Circleback meeting: {e!s}") logger.error(f"Error processing Circleback meeting: {e!s}")
raise raise