diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 1338fe16b..4d7e6b2ef 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -43,6 +43,7 @@ from app.db import ( async_session_maker, get_async_session, ) +from app.observability import metrics as ot_metrics from app.schemas import ( GoogleDriveIndexRequest, MCPConnectorCreate, @@ -104,7 +105,9 @@ async def _run_indexing_heartbeat_loop(notification_id: int) -> None: await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) try: get_heartbeat_redis_client().setex(key, HEARTBEAT_TTL_SECONDS, "alive") + ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="connector") except Exception as e: + ot_metrics.record_celery_heartbeat_failure(heartbeat_type="connector") logger.warning( f"Failed to refresh Redis heartbeat for notification " f"{notification_id}: {e}" @@ -1338,7 +1341,13 @@ async def _run_indexing_with_notifications( get_heartbeat_redis_client().setex( heartbeat_key, HEARTBEAT_TTL_SECONDS, "0" ) + ot_metrics.record_celery_heartbeat_refresh( + heartbeat_type="connector" + ) except Exception as e: + ot_metrics.record_celery_heartbeat_failure( + heartbeat_type="connector" + ) logger.warning(f"Failed to set initial Redis heartbeat: {e}") # Start a background coroutine that refreshes the @@ -1397,8 +1406,14 @@ async def _run_indexing_with_notifications( get_heartbeat_redis_client().setex( heartbeat_key, HEARTBEAT_TTL_SECONDS, str(indexed_count) ) + ot_metrics.record_celery_heartbeat_refresh( + heartbeat_type="connector" + ) except Exception as e: # Don't let Redis errors break the indexing + ot_metrics.record_celery_heartbeat_failure( + heartbeat_type="connector" + ) logger.warning(f"Failed to set Redis heartbeat: {e}") try: diff --git a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py index c78e376bd..1f9609968 100644 --- a/surfsense_backend/app/tasks/celery_tasks/document_tasks.py +++ b/surfsense_backend/app/tasks/celery_tasks/document_tasks.py @@ -9,6 +9,7 @@ from uuid import UUID from app.celery_app import celery_app from app.config import config +from app.observability import metrics as ot_metrics from app.services.notification_service import NotificationService from app.services.task_logging_service import TaskLoggingService from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task @@ -59,7 +60,9 @@ def _start_heartbeat(notification_id: int) -> None: try: key = _get_heartbeat_key(notification_id) _get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "started") + ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="document") except Exception as e: + ot_metrics.record_celery_heartbeat_failure(heartbeat_type="document") logger.warning( f"Failed to set initial heartbeat for notification {notification_id}: {e}" ) @@ -87,7 +90,9 @@ async def _run_heartbeat_loop(notification_id: int): await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) try: _get_doc_heartbeat_redis().setex(key, HEARTBEAT_TTL_SECONDS, "alive") + ot_metrics.record_celery_heartbeat_refresh(heartbeat_type="document") except Exception as e: + ot_metrics.record_celery_heartbeat_failure(heartbeat_type="document") logger.warning( f"Failed to refresh heartbeat for notification {notification_id}: {e}" )