feat: enhance stale notification cleanup task to mark associated documents as failed

This commit is contained in:
Anish Sarkar 2026-02-06 05:31:45 +05:30
parent ed2fc5c636
commit 00a617ef17

View file

@ -4,33 +4,41 @@ This task runs periodically (every 5 minutes by default) to find notifications
that are stuck in "in_progress" status but don't have an active Redis heartbeat key. that are stuck in "in_progress" status but don't have an active Redis heartbeat key.
These are marked as "failed" to prevent the frontend from showing a perpetual "syncing" state. These are marked as "failed" to prevent the frontend from showing a perpetual "syncing" state.
Additionally, it cleans up documents stuck in pending/processing state that belong
to connectors with stale notifications.
Detection mechanism: Detection mechanism:
- Active indexing tasks set a Redis key with TTL (2 minutes) as a heartbeat - Active indexing tasks set a Redis key with TTL (2 minutes) as a heartbeat
- If the task crashes, the Redis key expires automatically - If the task crashes, the Redis key expires automatically
- This cleanup task checks for in-progress notifications without a Redis heartbeat key - This cleanup task checks for in-progress notifications without a Redis heartbeat key
- Such notifications are marked as failed with O(1) batch UPDATE - Such notifications are marked as failed with O(1) batch UPDATE
- Documents with pending/processing status for those connectors are also marked as failed
""" """
import contextlib
import json import json
import logging import logging
import os import os
from datetime import UTC, datetime from datetime import UTC, datetime
import redis import redis
from sqlalchemy import and_, text from sqlalchemy import and_, or_, text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.future import select from sqlalchemy.future import select
from sqlalchemy.pool import NullPool from sqlalchemy.pool import NullPool
from app.celery_app import celery_app from app.celery_app import celery_app
from app.config import config from app.config import config
from app.db import Notification from app.db import Document, DocumentStatus, Notification
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Redis client for checking heartbeats # Redis client for checking heartbeats
_redis_client: redis.Redis | None = None _redis_client: redis.Redis | None = None
# Error message shown to users when sync is interrupted
STALE_SYNC_ERROR_MESSAGE = "Sync was interrupted unexpectedly. Please retry."
def get_redis_client() -> redis.Redis: def get_redis_client() -> redis.Redis:
"""Get or create Redis client for heartbeat checking.""" """Get or create Redis client for heartbeat checking."""
@ -70,6 +78,7 @@ def cleanup_stale_indexing_notifications_task():
- Do NOT have a corresponding Redis heartbeat key (meaning task crashed) - Do NOT have a corresponding Redis heartbeat key (meaning task crashed)
And marks them as failed with O(1) batch UPDATE. And marks them as failed with O(1) batch UPDATE.
Also marks associated pending/processing documents as failed.
""" """
import asyncio import asyncio
@ -86,15 +95,20 @@ async def _cleanup_stale_notifications():
"""Find and mark stale connector indexing notifications as failed. """Find and mark stale connector indexing notifications as failed.
Uses Redis TTL-based detection: Uses Redis TTL-based detection:
1. Find all in-progress notifications 1. Find all in-progress notifications with their connector_id
2. Check which ones are missing their Redis heartbeat key 2. Check which ones are missing their Redis heartbeat key
3. Mark those as failed with O(1) batch UPDATE using JSONB || operator 3. Mark those as failed with O(1) batch UPDATE using JSONB || operator
4. Mark associated documents (pending/processing) as failed
""" """
async with get_celery_session_maker()() as session: async with get_celery_session_maker()() as session:
try: try:
# Find all in-progress connector indexing notifications # Find all in-progress connector indexing notifications
# Fetch full metadata to properly extract connector_id
result = await session.execute( result = await session.execute(
select(Notification.id).where( select(
Notification.id,
Notification.notification_metadata,
).where(
and_( and_(
Notification.type == "connector_indexing", Notification.type == "connector_indexing",
Notification.notification_metadata["status"].astext Notification.notification_metadata["status"].astext
@ -102,24 +116,37 @@ async def _cleanup_stale_notifications():
) )
) )
) )
in_progress_ids = [row[0] for row in result.fetchall()] in_progress_rows = result.fetchall()
if not in_progress_ids: if not in_progress_rows:
logger.debug("No in-progress connector indexing notifications found") logger.debug("No in-progress connector indexing notifications found")
return return
# Check which ones are missing heartbeat keys in Redis # Check which ones are missing heartbeat keys in Redis
redis_client = get_redis_client() redis_client = get_redis_client()
stale_notification_ids = [] stale_notification_ids = []
stale_connector_ids = []
for notification_id in in_progress_ids: for row in in_progress_rows:
notification_id = row[0]
metadata = row[1] # Full metadata dict
heartbeat_key = _get_heartbeat_key(notification_id) heartbeat_key = _get_heartbeat_key(notification_id)
if not redis_client.exists(heartbeat_key): if not redis_client.exists(heartbeat_key):
stale_notification_ids.append(notification_id) stale_notification_ids.append(notification_id)
# Extract connector_id from metadata dict for document cleanup
if metadata and isinstance(metadata, dict):
connector_id = metadata.get("connector_id")
logger.debug(
f"Notification {notification_id} metadata: {metadata}, "
f"connector_id: {connector_id}"
)
if connector_id is not None:
with contextlib.suppress(ValueError, TypeError):
stale_connector_ids.append(int(connector_id))
if not stale_notification_ids: if not stale_notification_ids:
logger.debug( logger.debug(
f"All {len(in_progress_ids)} in-progress notifications have active Redis heartbeats" f"All {len(in_progress_rows)} in-progress notifications have active Redis heartbeats"
) )
return return
@ -127,18 +154,17 @@ async def _cleanup_stale_notifications():
f"Found {len(stale_notification_ids)} stale connector indexing notifications " f"Found {len(stale_notification_ids)} stale connector indexing notifications "
f"(no Redis heartbeat key): {stale_notification_ids}" f"(no Redis heartbeat key): {stale_notification_ids}"
) )
logger.info(
# O(1) Batch UPDATE using JSONB || operator f"Connector IDs for document cleanup: {stale_connector_ids}"
# This merges the update data into existing notification_metadata
# Also updates title and message for proper UI display
error_message = (
"Something went wrong while syncing your content. Please retry."
) )
# O(1) Batch UPDATE notifications using JSONB || operator
# This merges the update data into existing notification_metadata
# Also updates title and message for proper UI display
update_data = { update_data = {
"status": "failed", "status": "failed",
"completed_at": datetime.now(UTC).isoformat(), "completed_at": datetime.now(UTC).isoformat(),
"error_message": error_message, "error_message": STALE_SYNC_ERROR_MESSAGE,
"sync_stage": "failed", "sync_stage": "failed",
} }
@ -152,16 +178,96 @@ async def _cleanup_stale_notifications():
"""), """),
{ {
"update_json": json.dumps(update_data), "update_json": json.dumps(update_data),
"display_message": f"{error_message}", "display_message": STALE_SYNC_ERROR_MESSAGE,
"ids": stale_notification_ids, "ids": stale_notification_ids,
}, },
) )
await session.commit()
logger.info( logger.info(
f"Successfully marked {len(stale_notification_ids)} stale notifications as failed (batch UPDATE)" f"Successfully marked {len(stale_notification_ids)} stale notifications as failed"
) )
# ===== Clean up stuck documents for stale connectors =====
if stale_connector_ids:
await _cleanup_stuck_documents(session, stale_connector_ids)
await session.commit()
except Exception as e: except Exception as e:
logger.error(f"Error cleaning up stale notifications: {e!s}", exc_info=True) logger.error(f"Error cleaning up stale notifications: {e!s}", exc_info=True)
await session.rollback() await session.rollback()
async def _cleanup_stuck_documents(session, connector_ids: list[int]):
"""
Mark documents stuck in pending/processing state as failed for given connectors.
This ensures that when a connector sync is interrupted, all partially-processed
documents are marked with a clear error state instead of being stuck indefinitely.
Args:
session: Database session
connector_ids: List of connector IDs whose documents should be cleaned up
"""
if not connector_ids:
return
try:
# Count documents that will be affected (for logging)
count_result = await session.execute(
select(Document.id).where(
and_(
Document.connector_id.in_(connector_ids),
or_(
Document.status["state"].astext == DocumentStatus.PENDING,
Document.status["state"].astext == DocumentStatus.PROCESSING,
),
)
)
)
stuck_doc_ids = [row[0] for row in count_result.fetchall()]
if not stuck_doc_ids:
logger.debug(f"No stuck documents found for connector IDs: {connector_ids}")
return
logger.warning(
f"Found {len(stuck_doc_ids)} stuck documents (pending/processing) "
f"for connector IDs {connector_ids}: {stuck_doc_ids[:20]}..." # Log first 20
)
# O(1) Batch UPDATE: Mark all stuck documents as failed using JSONB
# The error message matches what we show in notifications
failed_status = DocumentStatus.failed(STALE_SYNC_ERROR_MESSAGE)
await session.execute(
text("""
UPDATE documents
SET status = CAST(:failed_status AS jsonb),
updated_at = :now
WHERE connector_id = ANY(:connector_ids)
AND (
status->>'state' = :pending_state
OR status->>'state' = :processing_state
)
"""),
{
"failed_status": json.dumps(failed_status),
"now": datetime.now(UTC),
"connector_ids": connector_ids,
"pending_state": DocumentStatus.PENDING,
"processing_state": DocumentStatus.PROCESSING,
},
)
logger.info(
f"Successfully marked {len(stuck_doc_ids)} stuck documents as failed "
f"for connector IDs: {connector_ids}"
)
except Exception as e:
logger.error(
f"Error cleaning up stuck documents for connectors {connector_ids}: {e!s}",
exc_info=True,
)
# Don't raise - let the notification cleanup continue even if document cleanup fails