feat(tasks): record indexing heartbeat metrics

This commit is contained in:
Anish Sarkar 2026-05-22 13:50:32 +05:30
parent 7c07c220fc
commit 87a4dcfd05
2 changed files with 20 additions and 0 deletions

View file

@ -43,6 +43,7 @@ from app.db import (
async_session_maker, async_session_maker,
get_async_session, get_async_session,
) )
from app.observability import metrics as ot_metrics
from app.schemas import ( from app.schemas import (
GoogleDriveIndexRequest, GoogleDriveIndexRequest,
MCPConnectorCreate, MCPConnectorCreate,
@ -104,7 +105,9 @@ async def _run_indexing_heartbeat_loop(notification_id: int) -> None:
await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL)
try: try:
get_heartbeat_redis_client().setex(key, HEARTBEAT_TTL_SECONDS, "alive") get_heartbeat_redis_client().setex(key, HEARTBEAT_TTL_SECONDS, "alive")
ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="connector")
except Exception as e: except Exception as e:
ot_metrics.record_celery_heartbeat_failure(heartbeat_type="connector")
logger.warning( logger.warning(
f"Failed to refresh Redis heartbeat for notification " f"Failed to refresh Redis heartbeat for notification "
f"{notification_id}: {e}" f"{notification_id}: {e}"
@ -1338,7 +1341,13 @@ async def _run_indexing_with_notifications(
get_heartbeat_redis_client().setex( get_heartbeat_redis_client().setex(
heartbeat_key, HEARTBEAT_TTL_SECONDS, "0" heartbeat_key, HEARTBEAT_TTL_SECONDS, "0"
) )
ot_metrics.record_celery_heartbeat_refresh(
heartbeat_type="connector"
)
except Exception as e: except Exception as e:
ot_metrics.record_celery_heartbeat_failure(
heartbeat_type="connector"
)
logger.warning(f"Failed to set initial Redis heartbeat: {e}") logger.warning(f"Failed to set initial Redis heartbeat: {e}")
# Start a background coroutine that refreshes the # Start a background coroutine that refreshes the
@ -1397,8 +1406,14 @@ async def _run_indexing_with_notifications(
get_heartbeat_redis_client().setex( get_heartbeat_redis_client().setex(
heartbeat_key, HEARTBEAT_TTL_SECONDS, str(indexed_count) heartbeat_key, HEARTBEAT_TTL_SECONDS, str(indexed_count)
) )
ot_metrics.record_celery_heartbeat_refresh(
heartbeat_type="connector"
)
except Exception as e: except Exception as e:
# Don't let Redis errors break the indexing # Don't let Redis errors break the indexing
ot_metrics.record_celery_heartbeat_failure(
heartbeat_type="connector"
)
logger.warning(f"Failed to set Redis heartbeat: {e}") logger.warning(f"Failed to set Redis heartbeat: {e}")
try: try:

View file

@ -9,6 +9,7 @@ from uuid import UUID
from app.celery_app import celery_app from app.celery_app import celery_app
from app.config import config from app.config import config
from app.observability import metrics as ot_metrics
from app.services.notification_service import NotificationService from app.services.notification_service import NotificationService
from app.services.task_logging_service import TaskLoggingService from app.services.task_logging_service import TaskLoggingService
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
@ -59,7 +60,9 @@ def _start_heartbeat(notification_id: int) -> None:
try: try:
key = _get_heartbeat_key(notification_id) key = _get_heartbeat_key(notification_id)
_get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "started") _get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "started")
ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="document")
except Exception as e: except Exception as e:
ot_metrics.record_celery_heartbeat_failure(heartbeat_type="document")
logger.warning( logger.warning(
f"Failed to set initial heartbeat for notification {notification_id}: {e}" f"Failed to set initial heartbeat for notification {notification_id}: {e}"
) )
@ -87,7 +90,9 @@ async def _run_heartbeat_loop(notification_id: int):
await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL)
try: try:
_get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "alive") _get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "alive")
ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="document")
except Exception as e: except Exception as e:
ot_metrics.record_celery_heartbeat_failure(heartbeat_type="document")
logger.warning( logger.warning(
f"Failed to refresh heartbeat for notification {notification_id}: {e}" f"Failed to refresh heartbeat for notification {notification_id}: {e}"
) )