diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index bb20da65d..c10838ed6 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -81,6 +81,38 @@ _heartbeat_redis_client: redis.Redis | None = None # Redis key TTL - notification is stale if no heartbeat in this time HEARTBEAT_TTL_SECONDS = 120 # 2 minutes +# How often the background loop refreshes the Redis key. Must be < TTL so +# the key cannot expire between refreshes when the indexing function is +# doing blocking work (e.g. gitingest in Phase 1) that doesn't trigger +# on_heartbeat_callback. +HEARTBEAT_REFRESH_INTERVAL = 60 + + +async def _run_indexing_heartbeat_loop(notification_id: int) -> None: + """Background coroutine that refreshes the Redis heartbeat every + HEARTBEAT_REFRESH_INTERVAL seconds while a connector indexing task is + running. + + Mirrors `_run_heartbeat_loop` in app/tasks/celery_tasks/document_tasks.py. + Cancelled via heartbeat_task.cancel() when the indexing call returns + (success or failure). If the worker dies, the coroutine dies with it + and the Redis key expires naturally on its TTL. + """ + key = _get_heartbeat_key(notification_id) + try: + while True: + await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) + try: + get_heartbeat_redis_client().setex( + key, HEARTBEAT_TTL_SECONDS, "alive" + ) + except Exception as e: + logger.warning( + f"Failed to refresh Redis heartbeat for notification " + f"{notification_id}: {e}" + ) + except asyncio.CancelledError: + pass # Normal cancellation when the indexing task completes def get_heartbeat_redis_client() -> redis.Redis: @@ -1457,6 +1489,7 @@ async def _run_indexing_with_notifications( notification = None connector_lock_acquired = False + heartbeat_task: asyncio.Task | None = None # Track indexed count for retry notifications and heartbeat current_indexed_count = 0 @@ -1502,6 +1535,16 @@ async def _run_indexing_with_notifications( except Exception as e: logger.warning(f"Failed to set initial Redis heartbeat: {e}") + # Start a background coroutine that refreshes the + # heartbeat every HEARTBEAT_REFRESH_INTERVAL seconds. + # Without this the cleanup_stale_indexing_notifications + # task can mark the doc failed when on_heartbeat_callback + # doesn't fire — for example during the GitHub + # connector's Phase 1 gitingest blocking call (#1295). + heartbeat_task = asyncio.create_task( + _run_indexing_heartbeat_loop(notification.id) + ) + # Update notification to fetching stage if notification: await NotificationService.connector_indexing.notify_indexing_progress( @@ -1792,6 +1835,13 @@ async def _run_indexing_with_notifications( except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") finally: + # Stop the background heartbeat refresher BEFORE deleting the + # Redis key, so the loop cannot race and re-create the key + # after we delete it. + if heartbeat_task is not None: + heartbeat_task.cancel() + with suppress(Exception): + await asyncio.gather(heartbeat_task, return_exceptions=True) # Clean up Redis heartbeat key when task completes (success or failure) if notification: try: