continue indexing when notification creation fails

This commit is contained in:
CREDO23 2026-06-17 15:06:05 +02:00
parent e37b9b5e31
commit 6d1879ffcb

View file

@ -602,23 +602,29 @@ async def _process_file_upload(
# Create notification for document processing
logger.info(f"[_process_file_upload] Creating notification for: {filename}")
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
notification = None
heartbeat_task = None
try:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
)
)
logger.info(
f"[_process_file_upload] Notification created with ID: {notification.id}"
)
_start_heartbeat(notification.id)
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
except Exception:
logger.warning(
f"[_process_file_upload] Failed to create notification for: {filename}",
exc_info=True,
)
)
logger.info(
f"[_process_file_upload] Notification created with ID: {notification.id if notification else 'None'}"
)
# Start Redis heartbeat for stale task detection
_start_heartbeat(notification.id)
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
log_entry = await task_logger.log_task_start(
task_name="process_file_upload",
@ -646,23 +652,21 @@ async def _process_file_upload(
# Update notification on success
if result:
await (
NotificationService.document_processing.notify_processing_completed(
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
else:
# Duplicate detected
await (
NotificationService.document_processing.notify_processing_completed(
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Document already exists (duplicate)",
)
)
except Exception as e:
# Import here to avoid circular dependencies
@ -691,13 +695,13 @@ async def _process_file_upload(
error_message = str(credit_error)
# Create a dedicated insufficient credits notification
try:
# First, mark the processing notification as failed
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Insufficient credits",
)
if notification:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Insufficient credits",
)
# Then create a separate insufficient_credits notification for better UX
await NotificationService.insufficient_credits.notify_insufficient_credits(
@ -717,12 +721,13 @@ async def _process_file_upload(
# HTTPException with page limit message but no detailed cause
error_message = str(e.detail)
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
if notification:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
@ -731,13 +736,13 @@ async def _process_file_upload(
error_message = str(e)[:100]
# Update notification on failure - wrapped in try-except to ensure it doesn't fail silently
try:
# Refresh notification to ensure it's not stale after any rollback
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
if notification:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=error_message,
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
@ -753,8 +758,10 @@ async def _process_file_upload(
raise
finally:
# Stop heartbeat — key deleted on success, expires on crash
heartbeat_task.cancel()
_stop_heartbeat(notification.id)
if heartbeat_task:
heartbeat_task.cancel()
if notification:
_stop_heartbeat(notification.id)
@celery_app.task(name="process_file_upload_with_document", bind=True)
@ -894,29 +901,36 @@ async def _process_file_with_document(
logger.info(
f"[_process_file_with_document] Creating notification for: {filename}"
)
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
notification = None
heartbeat_task = None
try:
notification = (
await NotificationService.document_processing.notify_processing_started(
session=session,
user_id=UUID(user_id),
document_type="FILE",
document_name=filename,
search_space_id=search_space_id,
file_size=file_size,
)
)
)
# Store document_id in notification metadata so cleanup task can find the document
if notification and notification.notification_metadata is not None:
notification.notification_metadata["document_id"] = document_id
from sqlalchemy.orm.attributes import flag_modified
# Store document_id in notification metadata so cleanup task can find the document
if notification.notification_metadata is not None:
notification.notification_metadata["document_id"] = document_id
from sqlalchemy.orm.attributes import flag_modified
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
flag_modified(notification, "notification_metadata")
await session.commit()
await session.refresh(notification)
# Start Redis heartbeat for stale task detection
_start_heartbeat(notification.id)
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
_start_heartbeat(notification.id)
heartbeat_task = asyncio.create_task(_run_heartbeat_loop(notification.id))
except Exception:
logger.warning(
f"[_process_file_with_document] Failed to create notification for: {filename}",
exc_info=True,
)
log_entry = await task_logger.log_task_start(
task_name="process_file_upload_with_document",
@ -956,14 +970,13 @@ async def _process_file_with_document(
# Update notification on success
if result:
await (
NotificationService.document_processing.notify_processing_completed(
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
document_id=result.id,
chunks_count=None,
)
)
logger.info(
f"[_process_file_with_document] Successfully processed document {document_id}"
)
@ -972,13 +985,12 @@ async def _process_file_with_document(
document.status = DocumentStatus.failed("Duplicate content detected")
document.updated_at = get_current_timestamp()
await session.commit()
await (
NotificationService.document_processing.notify_processing_completed(
if notification:
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Document already exists (duplicate)",
)
)
except Exception as e:
# Import here to avoid circular dependencies
@ -1009,12 +1021,13 @@ async def _process_file_with_document(
# Handle insufficient-credit errors with dedicated notification
if credit_error is not None:
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Insufficient credits",
)
if notification:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message="Insufficient credits",
)
await NotificationService.insufficient_credits.notify_insufficient_credits(
session=session,
user_id=UUID(user_id),
@ -1031,12 +1044,13 @@ async def _process_file_with_document(
else:
# Update notification on failure
try:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
if notification:
await session.refresh(notification)
await NotificationService.document_processing.notify_processing_completed(
session=session,
notification=notification,
error_message=str(e)[:100],
)
except Exception as notif_error:
logger.error(
f"Failed to update notification on failure: {notif_error!s}"
@ -1053,8 +1067,10 @@ async def _process_file_with_document(
finally:
# Stop heartbeat — key deleted on success, expires on crash
heartbeat_task.cancel()
_stop_heartbeat(notification.id)
if heartbeat_task:
heartbeat_task.cancel()
if notification:
_stop_heartbeat(notification.id)
# Clean up temp file
if os.path.exists(temp_path):