fix(connectors): refresh Redis heartbeat during long Phase 1 indexing

Closes #1295

The connector indexing route's `_run_indexing_with_notifications` set the
Redis heartbeat key once at the start of indexing and relied on
`on_heartbeat_callback` (only fired in Phase 2 per-document loops) to
refresh it. The GitHub connector's Phase 1 runs `gitingest` as a blocking
subprocess via `asyncio.to_thread`, so for any repo larger than the
2-minute TTL, the key expires before Phase 2 starts. The
`cleanup_stale_indexing_notifications_task` then marks the document as
failed with the misleading "Sync was interrupted unexpectedly. Please
retry." message — even though the indexing thread is still running and
gitingest's own subprocess timeout is 900 seconds.

Add a background asyncio coroutine that refreshes the Redis key every
60 seconds for the duration of the indexing call. Same pattern already
in use at app/tasks/celery_tasks/document_tasks.py:_run_heartbeat_loop,
just adapted to use the route's get_heartbeat_redis_client() and
_get_heartbeat_key() helpers.

Cancellation runs in the `finally` block BEFORE the heartbeat-key
delete so the loop cannot race and re-create the key after we have
deleted it. The new `HEARTBEAT_REFRESH_INTERVAL = 60` constant mirrors
the celery task module's value.
This commit is contained in:
Matt Van Horn 2026-04-26 02:46:43 -07:00
parent 7c4d1a6af6
commit 456dd7417c
No known key found for this signature in database

View file

@ -81,6 +81,38 @@ _heartbeat_redis_client: redis.Redis | None = None
# Redis key TTL - notification is stale if no heartbeat in this time # Redis key TTL - notification is stale if no heartbeat in this time
HEARTBEAT_TTL_SECONDS = 120 # 2 minutes HEARTBEAT_TTL_SECONDS = 120 # 2 minutes
# How often the background loop refreshes the Redis key. Must be < TTL so
# the key cannot expire between refreshes when the indexing function is
# doing blocking work (e.g. gitingest in Phase 1) that doesn't trigger
# on_heartbeat_callback.
HEARTBEAT_REFRESH_INTERVAL = 60
async def _run_indexing_heartbeat_loop(notification_id: int) -> None:
"""Background coroutine that refreshes the Redis heartbeat every
HEARTBEAT_REFRESH_INTERVAL seconds while a connector indexing task is
running.
Mirrors `_run_heartbeat_loop` in app/tasks/celery_tasks/document_tasks.py.
Cancelled via heartbeat_task.cancel() when the indexing call returns
(success or failure). If the worker dies, the coroutine dies with it
and the Redis key expires naturally on its TTL.
"""
key = _get_heartbeat_key(notification_id)
try:
while True:
await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL)
try:
get_heartbeat_redis_client().setex(
key, HEARTBEAT_TTL_SECONDS, "alive"
)
except Exception as e:
logger.warning(
f"Failed to refresh Redis heartbeat for notification "
f"{notification_id}: {e}"
)
except asyncio.CancelledError:
pass # Normal cancellation when the indexing task completes
def get_heartbeat_redis_client() -> redis.Redis: def get_heartbeat_redis_client() -> redis.Redis:
@ -1457,6 +1489,7 @@ async def _run_indexing_with_notifications(
notification = None notification = None
connector_lock_acquired = False connector_lock_acquired = False
heartbeat_task: asyncio.Task | None = None
# Track indexed count for retry notifications and heartbeat # Track indexed count for retry notifications and heartbeat
current_indexed_count = 0 current_indexed_count = 0
@ -1502,6 +1535,16 @@ async def _run_indexing_with_notifications(
except Exception as e: except Exception as e:
logger.warning(f"Failed to set initial Redis heartbeat: {e}") logger.warning(f"Failed to set initial Redis heartbeat: {e}")
# Start a background coroutine that refreshes the
# heartbeat every HEARTBEAT_REFRESH_INTERVAL seconds.
# Without this the cleanup_stale_indexing_notifications
# task can mark the doc failed when on_heartbeat_callback
# doesn't fire — for example during the GitHub
# connector's Phase 1 gitingest blocking call (#1295).
heartbeat_task = asyncio.create_task(
_run_indexing_heartbeat_loop(notification.id)
)
# Update notification to fetching stage # Update notification to fetching stage
if notification: if notification:
await NotificationService.connector_indexing.notify_indexing_progress( await NotificationService.connector_indexing.notify_indexing_progress(
@ -1792,6 +1835,13 @@ async def _run_indexing_with_notifications(
except Exception as notif_error: except Exception as notif_error:
logger.error(f"Failed to update notification: {notif_error!s}") logger.error(f"Failed to update notification: {notif_error!s}")
finally: finally:
# Stop the background heartbeat refresher BEFORE deleting the
# Redis key, so the loop cannot race and re-create the key
# after we delete it.
if heartbeat_task is not None:
heartbeat_task.cancel()
with suppress(Exception):
await asyncio.gather(heartbeat_task, return_exceptions=True)
# Clean up Redis heartbeat key when task completes (success or failure) # Clean up Redis heartbeat key when task completes (success or failure)
if notification: if notification:
try: try: