From 456dd7417cc4b8a497b60d49347f96f29765831b Mon Sep 17 00:00:00 2001 From: Matt Van Horn <455140+mvanhorn@users.noreply.github.com> Date: Sun, 26 Apr 2026 02:46:43 -0700 Subject: [PATCH] fix(connectors): refresh Redis heartbeat during long Phase 1 indexing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Closes #1295 The connector indexing route's `_run_indexing_with_notifications` set the Redis heartbeat key once at the start of indexing and relied on `on_heartbeat_callback` (only fired in Phase 2 per-document loops) to refresh it. The GitHub connector's Phase 1 runs `gitingest` as a blocking subprocess via `asyncio.to_thread`, so for any repo larger than the 2-minute TTL, the key expires before Phase 2 starts. The `cleanup_stale_indexing_notifications_task` then marks the document as failed with the misleading "Sync was interrupted unexpectedly. Please retry." message — even though the indexing thread is still running and gitingest's own subprocess timeout is 900 seconds. Add a background asyncio coroutine that refreshes the Redis key every 60 seconds for the duration of the indexing call. Same pattern already in use at app/tasks/celery_tasks/document_tasks.py:_run_heartbeat_loop, just adapted to use the route's get_heartbeat_redis_client() and _get_heartbeat_key() helpers. Cancellation runs in the `finally` block BEFORE the heartbeat-key delete so the loop cannot race and re-create the key after we have deleted it. The new `HEARTBEAT_REFRESH_INTERVAL = 60` constant mirrors the celery task module's value. --- .../routes/search_source_connectors_routes.py | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index bb20da65d..c10838ed6 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -81,6 +81,38 @@ _heartbeat_redis_client: redis.Redis | None = None # Redis key TTL - notification is stale if no heartbeat in this time HEARTBEAT_TTL_SECONDS = 120 # 2 minutes +# How often the background loop refreshes the Redis key. Must be < TTL so +# the key cannot expire between refreshes when the indexing function is +# doing blocking work (e.g. gitingest in Phase 1) that doesn't trigger +# on_heartbeat_callback. +HEARTBEAT_REFRESH_INTERVAL = 60 + + +async def _run_indexing_heartbeat_loop(notification_id: int) -> None: + """Background coroutine that refreshes the Redis heartbeat every + HEARTBEAT_REFRESH_INTERVAL seconds while a connector indexing task is + running. + + Mirrors `_run_heartbeat_loop` in app/tasks/celery_tasks/document_tasks.py. + Cancelled via heartbeat_task.cancel() when the indexing call returns + (success or failure). If the worker dies, the coroutine dies with it + and the Redis key expires naturally on its TTL. + """ + key = _get_heartbeat_key(notification_id) + try: + while True: + await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL) + try: + get_heartbeat_redis_client().setex( + key, HEARTBEAT_TTL_SECONDS, "alive" + ) + except Exception as e: + logger.warning( + f"Failed to refresh Redis heartbeat for notification " + f"{notification_id}: {e}" + ) + except asyncio.CancelledError: + pass # Normal cancellation when the indexing task completes def get_heartbeat_redis_client() -> redis.Redis: @@ -1457,6 +1489,7 @@ async def _run_indexing_with_notifications( notification = None connector_lock_acquired = False + heartbeat_task: asyncio.Task | None = None # Track indexed count for retry notifications and heartbeat current_indexed_count = 0 @@ -1502,6 +1535,16 @@ async def _run_indexing_with_notifications( except Exception as e: logger.warning(f"Failed to set initial Redis heartbeat: {e}") + # Start a background coroutine that refreshes the + # heartbeat every HEARTBEAT_REFRESH_INTERVAL seconds. + # Without this the cleanup_stale_indexing_notifications + # task can mark the doc failed when on_heartbeat_callback + # doesn't fire — for example during the GitHub + # connector's Phase 1 gitingest blocking call (#1295). + heartbeat_task = asyncio.create_task( + _run_indexing_heartbeat_loop(notification.id) + ) + # Update notification to fetching stage if notification: await NotificationService.connector_indexing.notify_indexing_progress( @@ -1792,6 +1835,13 @@ async def _run_indexing_with_notifications( except Exception as notif_error: logger.error(f"Failed to update notification: {notif_error!s}") finally: + # Stop the background heartbeat refresher BEFORE deleting the + # Redis key, so the loop cannot race and re-create the key + # after we delete it. + if heartbeat_task is not None: + heartbeat_task.cancel() + with suppress(Exception): + await asyncio.gather(heartbeat_task, return_exceptions=True) # Clean up Redis heartbeat key when task completes (success or failure) if notification: try: