mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-05-19 18:45:15 +02:00
Merge pull request #1306 from mvanhorn/osc/1295-github-connector-heartbeat
fix(connectors): refresh Redis heartbeat during long Phase 1 indexing
This commit is contained in:
commit
7d782c7837
1 changed files with 48 additions and 0 deletions
|
|
@ -81,6 +81,36 @@ _heartbeat_redis_client: redis.Redis | None = None
|
||||||
|
|
||||||
# Redis key TTL - notification is stale if no heartbeat in this time
|
# Redis key TTL - notification is stale if no heartbeat in this time
|
||||||
HEARTBEAT_TTL_SECONDS = 120 # 2 minutes
|
HEARTBEAT_TTL_SECONDS = 120 # 2 minutes
|
||||||
|
# How often the background loop refreshes the Redis key. Must be < TTL so
|
||||||
|
# the key cannot expire between refreshes when the indexing function is
|
||||||
|
# doing blocking work (e.g. gitingest in Phase 1) that doesn't trigger
|
||||||
|
# on_heartbeat_callback.
|
||||||
|
HEARTBEAT_REFRESH_INTERVAL = 60
|
||||||
|
|
||||||
|
|
||||||
|
async def _run_indexing_heartbeat_loop(notification_id: int) -> None:
|
||||||
|
"""Background coroutine that refreshes the Redis heartbeat every
|
||||||
|
HEARTBEAT_REFRESH_INTERVAL seconds while a connector indexing task is
|
||||||
|
running.
|
||||||
|
|
||||||
|
Mirrors `_run_heartbeat_loop` in app/tasks/celery_tasks/document_tasks.py.
|
||||||
|
Cancelled via heartbeat_task.cancel() when the indexing call returns
|
||||||
|
(success or failure). If the worker dies, the coroutine dies with it
|
||||||
|
and the Redis key expires naturally on its TTL.
|
||||||
|
"""
|
||||||
|
key = _get_heartbeat_key(notification_id)
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
await asyncio.sleep(HEARTBEAT_REFRESH_INTERVAL)
|
||||||
|
try:
|
||||||
|
get_heartbeat_redis_client().setex(key, HEARTBEAT_TTL_SECONDS, "alive")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(
|
||||||
|
f"Failed to refresh Redis heartbeat for notification "
|
||||||
|
f"{notification_id}: {e}"
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
pass # Normal cancellation when the indexing task completes
|
||||||
|
|
||||||
|
|
||||||
def get_heartbeat_redis_client() -> redis.Redis:
|
def get_heartbeat_redis_client() -> redis.Redis:
|
||||||
|
|
@ -1284,6 +1314,7 @@ async def _run_indexing_with_notifications(
|
||||||
|
|
||||||
notification = None
|
notification = None
|
||||||
connector_lock_acquired = False
|
connector_lock_acquired = False
|
||||||
|
heartbeat_task: asyncio.Task | None = None
|
||||||
# Track indexed count for retry notifications and heartbeat
|
# Track indexed count for retry notifications and heartbeat
|
||||||
current_indexed_count = 0
|
current_indexed_count = 0
|
||||||
|
|
||||||
|
|
@ -1329,6 +1360,16 @@ async def _run_indexing_with_notifications(
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"Failed to set initial Redis heartbeat: {e}")
|
logger.warning(f"Failed to set initial Redis heartbeat: {e}")
|
||||||
|
|
||||||
|
# Start a background coroutine that refreshes the
|
||||||
|
# heartbeat every HEARTBEAT_REFRESH_INTERVAL seconds.
|
||||||
|
# Without this the cleanup_stale_indexing_notifications
|
||||||
|
# task can mark the doc failed when on_heartbeat_callback
|
||||||
|
# doesn't fire — for example during the GitHub
|
||||||
|
# connector's Phase 1 gitingest blocking call (#1295).
|
||||||
|
heartbeat_task = asyncio.create_task(
|
||||||
|
_run_indexing_heartbeat_loop(notification.id)
|
||||||
|
)
|
||||||
|
|
||||||
# Update notification to fetching stage
|
# Update notification to fetching stage
|
||||||
if notification:
|
if notification:
|
||||||
await NotificationService.connector_indexing.notify_indexing_progress(
|
await NotificationService.connector_indexing.notify_indexing_progress(
|
||||||
|
|
@ -1619,6 +1660,13 @@ async def _run_indexing_with_notifications(
|
||||||
except Exception as notif_error:
|
except Exception as notif_error:
|
||||||
logger.error(f"Failed to update notification: {notif_error!s}")
|
logger.error(f"Failed to update notification: {notif_error!s}")
|
||||||
finally:
|
finally:
|
||||||
|
# Stop the background heartbeat refresher BEFORE deleting the
|
||||||
|
# Redis key, so the loop cannot race and re-create the key
|
||||||
|
# after we delete it.
|
||||||
|
if heartbeat_task is not None:
|
||||||
|
heartbeat_task.cancel()
|
||||||
|
with suppress(Exception):
|
||||||
|
await asyncio.gather(heartbeat_task, return_exceptions=True)
|
||||||
# Clean up Redis heartbeat key when task completes (success or failure)
|
# Clean up Redis heartbeat key when task completes (success or failure)
|
||||||
if notification:
|
if notification:
|
||||||
try:
|
try:
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue