feat: implement Redis heartbeat tracking for connector indexing tasks and update stale notification cleanup logic

This commit is contained in:
Anish Sarkar 2026-02-02 00:18:47 +05:30
parent 085653d3e3
commit 05d1d6ac04
2 changed files with 139 additions and 69 deletions

View file

@ -1,18 +1,25 @@
"""Celery task to detect and mark stale connector indexing notifications as failed.
This task runs periodically (every 5 minutes by default) to find notifications
that are stuck in "in_progress" status but haven't received a heartbeat update
in the configured timeout period. These are marked as "failed" to prevent the
frontend from showing a perpetual "syncing" state.
that are stuck in "in_progress" status but don't have an active Redis heartbeat key.
These are marked as "failed" to prevent the frontend from showing a perpetual "syncing" state.
Detection mechanism:
- Active indexing tasks set a Redis key with TTL (2 minutes) as a heartbeat
- If the task crashes, the Redis key expires automatically
- This cleanup task checks for in-progress notifications without a Redis heartbeat key
- Such notifications are marked as failed with O(1) batch UPDATE
"""
import json
import logging
from datetime import UTC, datetime, timedelta
import os
from datetime import UTC, datetime
from sqlalchemy import and_
import redis
from sqlalchemy import and_, text
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.orm.attributes import flag_modified
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
@ -21,10 +28,22 @@ from app.db import Notification
logger = logging.getLogger(__name__)
# Timeout in minutes - notifications without heartbeat for this long are marked as failed
# Should be longer than HEARTBEAT_INTERVAL_SECONDS (30s) * a reasonable number of missed heartbeats
# 5 minutes = 10 missed heartbeats, which is a reasonable threshold
STALE_NOTIFICATION_TIMEOUT_MINUTES = 5
# Redis client for checking heartbeats
_redis_client: redis.Redis | None = None
def get_redis_client() -> redis.Redis:
"""Get or create Redis client for heartbeat checking."""
global _redis_client
if _redis_client is None:
redis_url = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
_redis_client = redis.from_url(redis_url, decode_responses=True)
return _redis_client
def _get_heartbeat_key(notification_id: int) -> str:
"""Generate Redis key for notification heartbeat."""
return f"indexing:heartbeat:{notification_id}"
def get_celery_session_maker():
@ -45,9 +64,9 @@ def cleanup_stale_indexing_notifications_task():
This task finds notifications that:
- Have type = 'connector_indexing'
- Have metadata.status = 'in_progress'
- Have updated_at older than STALE_NOTIFICATION_TIMEOUT_MINUTES
- Do NOT have a corresponding Redis heartbeat key (meaning task crashed)
And marks them as failed with an appropriate error message.
And marks them as failed with O(1) batch UPDATE.
"""
import asyncio
@ -61,84 +80,83 @@ def cleanup_stale_indexing_notifications_task():
async def _cleanup_stale_notifications():
"""Find and mark stale connector indexing notifications as failed."""
"""Find and mark stale connector indexing notifications as failed.
Uses Redis TTL-based detection:
1. Find all in-progress notifications
2. Check which ones are missing their Redis heartbeat key
3. Mark those as failed with O(1) batch UPDATE using JSONB || operator
"""
async with get_celery_session_maker()() as session:
try:
# Calculate the cutoff time
cutoff_time = datetime.now(UTC) - timedelta(
minutes=STALE_NOTIFICATION_TIMEOUT_MINUTES
)
# Find stale notifications:
# - type = 'connector_indexing'
# - metadata->>'status' = 'in_progress'
# - updated_at < cutoff_time
# Find all in-progress connector indexing notifications
result = await session.execute(
select(Notification).filter(
select(Notification.id).where(
and_(
Notification.type == "connector_indexing",
Notification.notification_metadata["status"].astext
== "in_progress",
Notification.updated_at < cutoff_time,
)
)
)
stale_notifications = result.scalars().all()
in_progress_ids = [row[0] for row in result.fetchall()]
if not stale_notifications:
logger.debug("No stale connector indexing notifications found")
if not in_progress_ids:
logger.debug("No in-progress connector indexing notifications found")
return
# Check which ones are missing heartbeat keys in Redis
redis_client = get_redis_client()
stale_notification_ids = []
for notification_id in in_progress_ids:
heartbeat_key = _get_heartbeat_key(notification_id)
if not redis_client.exists(heartbeat_key):
stale_notification_ids.append(notification_id)
if not stale_notification_ids:
logger.debug(
f"All {len(in_progress_ids)} in-progress notifications have active Redis heartbeats"
)
return
logger.warning(
f"Found {len(stale_notifications)} stale connector indexing notifications "
f"(no heartbeat for >{STALE_NOTIFICATION_TIMEOUT_MINUTES} minutes)"
f"Found {len(stale_notification_ids)} stale connector indexing notifications "
f"(no Redis heartbeat key): {stale_notification_ids}"
)
# Mark each stale notification as failed
for notification in stale_notifications:
try:
# Get current indexed count from metadata if available
indexed_count = notification.notification_metadata.get(
"indexed_count", 0
)
connector_name = notification.notification_metadata.get(
"connector_name", "Unknown"
)
# O(1) Batch UPDATE using JSONB || operator
# This merges the update data into existing notification_metadata
# Also updates title and message for proper UI display
error_message = (
"Something went wrong while syncing your content. Please retry."
)
# Calculate how long it's been stale
stale_duration = datetime.now(UTC) - notification.updated_at
stale_minutes = int(stale_duration.total_seconds() / 60)
update_data = {
"status": "failed",
"completed_at": datetime.now(UTC).isoformat(),
"error_message": error_message,
"sync_stage": "failed",
}
# Update notification metadata
notification.notification_metadata["status"] = "failed"
notification.notification_metadata["completed_at"] = datetime.now(
UTC
).isoformat()
notification.notification_metadata["error_message"] = (
f"Indexing task appears to have crashed or timed out. "
f"No activity detected for {stale_minutes} minutes. "
f"Please try syncing again."
)
await session.execute(
text("""
UPDATE notifications
SET metadata = metadata || CAST(:update_json AS jsonb),
title = 'Failed: ' || COALESCE(metadata->>'connector_name', 'Connector'),
message = :display_message
WHERE id = ANY(:ids)
"""),
{
"update_json": json.dumps(update_data),
"display_message": f"{error_message}",
"ids": stale_notification_ids,
},
)
# Flag the JSONB column as modified for SQLAlchemy to detect the change
flag_modified(notification, "notification_metadata")
logger.info(
f"Marking notification {notification.id} for connector '{connector_name}' as failed "
f"(stale for {stale_minutes} minutes, indexed {indexed_count} items before failure)"
)
except Exception as e:
logger.error(
f"Error marking notification {notification.id} as failed: {e!s}",
exc_info=True,
)
continue
# Commit all changes
await session.commit()
logger.info(
f"Successfully marked {len(stale_notifications)} stale notifications as failed"
f"Successfully marked {len(stale_notification_ids)} stale notifications as failed (batch UPDATE)"
)
except Exception as e: