From 024a683b4feff66e8d7c6bd6b1ef45885979f866 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sun, 1 Feb 2026 02:17:06 +0530 Subject: [PATCH] feat: add heartbeat callback support for long-running indexing tasks and implement stale notification cleanup task --- surfsense_backend/app/celery_app.py | 11 ++ .../connectors/composio_gmail_connector.py | 15 ++ .../composio_google_calendar_connector.py | 14 ++ .../composio_google_drive_connector.py | 27 ++++ .../routes/search_source_connectors_routes.py | 75 +++++++++- .../stale_notification_cleanup_task.py | 141 ++++++++++++++++++ .../app/tasks/composio_indexer.py | 7 + .../connector_indexers/airtable_indexer.py | 24 +++ .../connector_indexers/bookstack_indexer.py | 17 +++ .../connector_indexers/clickup_indexer.py | 18 +++ .../connector_indexers/confluence_indexer.py | 17 +++ .../connector_indexers/discord_indexer.py | 18 +++ .../elasticsearch_indexer.py | 18 +++ .../connector_indexers/github_indexer.py | 18 +++ .../google_calendar_indexer.py | 17 +++ .../google_drive_indexer.py | 28 ++++ .../google_gmail_indexer.py | 18 +++ .../tasks/connector_indexers/jira_indexer.py | 17 +++ .../connector_indexers/linear_indexer.py | 18 +++ .../tasks/connector_indexers/luma_indexer.py | 17 +++ .../connector_indexers/notion_indexer.py | 19 +++ .../connector_indexers/obsidian_indexer.py | 17 +++ .../tasks/connector_indexers/slack_indexer.py | 18 +++ .../tasks/connector_indexers/teams_indexer.py | 19 +++ .../connector_indexers/webcrawler_indexer.py | 17 +++ .../hooks/use-indexing-connectors.ts | 64 +++++++- surfsense_web/lib/electric/client.ts | 3 +- 27 files changed, 685 insertions(+), 7 deletions(-) create mode 100644 surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 8858c2619..b77f5698e 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -79,6 +79,7 @@ celery_app = Celery( "app.tasks.celery_tasks.schedule_checker_task", "app.tasks.celery_tasks.blocknote_migration_tasks", "app.tasks.celery_tasks.document_reindex_tasks", + "app.tasks.celery_tasks.stale_notification_cleanup_task", ], ) @@ -121,4 +122,14 @@ celery_app.conf.beat_schedule = { "expires": 30, # Task expires after 30 seconds if not picked up }, }, + # Cleanup stale connector indexing notifications every 5 minutes + # This detects tasks that crashed or timed out without proper cleanup + # and marks their notifications as failed so users don't see perpetual "syncing" + "cleanup-stale-indexing-notifications": { + "task": "cleanup_stale_indexing_notifications", + "schedule": crontab(minute="*/5"), # Every 5 minutes + "options": { + "expires": 60, # Task expires after 60 seconds if not picked up + }, + }, } diff --git a/surfsense_backend/app/connectors/composio_gmail_connector.py b/surfsense_backend/app/connectors/composio_gmail_connector.py index 953e2e8fc..9bb1197b8 100644 --- a/surfsense_backend/app/connectors/composio_gmail_connector.py +++ b/surfsense_backend/app/connectors/composio_gmail_connector.py @@ -5,9 +5,15 @@ Provides Gmail specific methods for data retrieval and indexing via Composio. """ import logging +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import Any +# Heartbeat configuration +HeartbeatCallbackType = Callable[[int], Awaitable[None]] +HEARTBEAT_INTERVAL_SECONDS = 30 + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload @@ -427,6 +433,7 @@ async def index_composio_gmail( log_entry, update_last_indexed: bool = True, max_items: int = 1000, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str]: """Index Gmail messages via Composio with pagination and incremental processing.""" try: @@ -471,8 +478,16 @@ async def index_composio_gmail( total_documents_skipped = 0 total_messages_fetched = 0 result_size_estimate = None # Will be set from first API response + last_heartbeat_time = time.time() while total_messages_fetched < max_items: + # Send heartbeat periodically to indicate task is still alive + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(total_documents_indexed) + last_heartbeat_time = current_time + # Calculate how many messages to fetch in this batch remaining = max_items - total_messages_fetched current_batch_size = min(batch_size, remaining) diff --git a/surfsense_backend/app/connectors/composio_google_calendar_connector.py b/surfsense_backend/app/connectors/composio_google_calendar_connector.py index ec5b22b7f..669543210 100644 --- a/surfsense_backend/app/connectors/composio_google_calendar_connector.py +++ b/surfsense_backend/app/connectors/composio_google_calendar_connector.py @@ -5,9 +5,15 @@ Provides Google Calendar specific methods for data retrieval and indexing via Co """ import logging +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import Any +# Heartbeat configuration +HeartbeatCallbackType = Callable[[int], Awaitable[None]] +HEARTBEAT_INTERVAL_SECONDS = 30 + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select from sqlalchemy.orm import selectinload @@ -191,6 +197,7 @@ async def index_composio_google_calendar( log_entry, update_last_indexed: bool = True, max_items: int = 2500, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str]: """Index Google Calendar events via Composio.""" try: @@ -262,8 +269,15 @@ async def index_composio_google_calendar( duplicate_content_count = ( 0 # Track events skipped due to duplicate content_hash ) + last_heartbeat_time = time.time() for event in events: + # Send heartbeat periodically to indicate task is still alive + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time try: # Handle both standard Google API and potential Composio variations event_id = event.get("id", "") or event.get("eventId", "") diff --git a/surfsense_backend/app/connectors/composio_google_drive_connector.py b/surfsense_backend/app/connectors/composio_google_drive_connector.py index 5b8c4b993..debbced20 100644 --- a/surfsense_backend/app/connectors/composio_google_drive_connector.py +++ b/surfsense_backend/app/connectors/composio_google_drive_connector.py @@ -9,10 +9,16 @@ import json import logging import os import tempfile +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from pathlib import Path from typing import Any +# Heartbeat configuration +HeartbeatCallbackType = Callable[[int], Awaitable[None]] +HEARTBEAT_INTERVAL_SECONDS = 30 + from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.orm.attributes import flag_modified @@ -565,6 +571,7 @@ async def index_composio_google_drive( log_entry, update_last_indexed: bool = True, max_items: int = 1000, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, str | None]: """Index Google Drive files via Composio with delta sync support. @@ -652,6 +659,7 @@ async def index_composio_google_drive( max_items=max_items, task_logger=task_logger, log_entry=log_entry, + on_heartbeat_callback=on_heartbeat_callback, ) else: logger.info( @@ -684,6 +692,7 @@ async def index_composio_google_drive( max_items=max_items, task_logger=task_logger, log_entry=log_entry, + on_heartbeat_callback=on_heartbeat_callback, ) # Get new page token for next sync (always update after successful sync) @@ -765,6 +774,7 @@ async def _index_composio_drive_delta_sync( max_items: int, task_logger: TaskLoggingService, log_entry, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: """Index Google Drive files using delta sync (only changed files). @@ -774,6 +784,7 @@ async def _index_composio_drive_delta_sync( documents_indexed = 0 documents_skipped = 0 processing_errors = [] + last_heartbeat_time = time.time() # Fetch all changes with pagination all_changes = [] @@ -804,6 +815,13 @@ async def _index_composio_drive_delta_sync( logger.info(f"Processing {len(all_changes)} changes from delta sync") for change in all_changes[:max_items]: + # Send heartbeat periodically to indicate task is still alive + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + try: # Handle removed files is_removed = change.get("removed", False) @@ -886,11 +904,13 @@ async def _index_composio_drive_full_scan( max_items: int, task_logger: TaskLoggingService, log_entry, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, list[str]]: """Index Google Drive files using full scan (first sync or when no delta token).""" documents_indexed = 0 documents_skipped = 0 processing_errors = [] + last_heartbeat_time = time.time() all_files = [] @@ -1001,6 +1021,13 @@ async def _index_composio_drive_full_scan( ) for file_info in all_files: + # Send heartbeat periodically to indicate task is still alive + if on_heartbeat_callback: + current_time = time.time() + if current_time - last_heartbeat_time >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = current_time + try: # Handle both standard Google API and potential Composio variations file_id = file_info.get("id", "") or file_info.get("fileId", "") diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index a27c2125c..678bf73c0 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -1137,6 +1137,7 @@ async def run_slack_indexing( end_date=end_date, indexing_function=index_slack_messages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1150,6 +1151,7 @@ async def _run_indexing_with_notifications( indexing_function, update_timestamp_func=None, supports_retry_callback: bool = False, + supports_heartbeat_callback: bool = False, ): """ Generic helper to run indexing with real-time notifications. @@ -1164,11 +1166,13 @@ async def _run_indexing_with_notifications( indexing_function: Async function that performs the indexing update_timestamp_func: Optional function to update connector timestamp supports_retry_callback: Whether the indexing function supports on_retry_callback + supports_heartbeat_callback: Whether the indexing function supports on_heartbeat_callback """ + from celery.exceptions import SoftTimeLimitExceeded from uuid import UUID notification = None - # Track indexed count for retry notifications + # Track indexed count for retry notifications and heartbeat current_indexed_count = 0 try: @@ -1227,6 +1231,27 @@ async def _run_indexing_with_notifications( # Don't let notification errors break the indexing logger.warning(f"Failed to update retry notification: {e}") + # Create heartbeat callback for connectors that support it + # This updates the notification periodically during long-running indexing loops + # to prevent the task from appearing stuck if the worker crashes + async def on_heartbeat_callback(indexed_count: int) -> None: + """Callback to update notification during indexing (heartbeat).""" + nonlocal notification, current_indexed_count + current_indexed_count = indexed_count + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_progress( + session=session, + notification=notification, + indexed_count=indexed_count, + stage="processing", + ) + await session.commit() + except Exception as e: + # Don't let notification errors break the indexing + logger.warning(f"Failed to update heartbeat notification: {e}") + # Build kwargs for indexing function indexing_kwargs = { "session": session, @@ -1242,6 +1267,10 @@ async def _run_indexing_with_notifications( if supports_retry_callback: indexing_kwargs["on_retry_callback"] = on_retry_callback + # Add heartbeat callback for connectors that support it + if supports_heartbeat_callback: + indexing_kwargs["on_heartbeat_callback"] = on_heartbeat_callback + # Run the indexing function # Some indexers return (indexed, error), others return (indexed, skipped, error) result = await indexing_function(**indexing_kwargs) @@ -1398,6 +1427,30 @@ async def _run_indexing_with_notifications( await ( session.commit() ) # Commit to ensure Electric SQL syncs the notification update + except SoftTimeLimitExceeded: + # Celery soft time limit was reached - task is about to be killed + # Gracefully save progress and mark as interrupted + logger.warning( + f"Soft time limit reached for connector {connector_id}. " + f"Saving partial progress: {current_indexed_count} items indexed." + ) + + if notification: + try: + await session.refresh(notification) + await NotificationService.connector_indexing.notify_indexing_completed( + session=session, + notification=notification, + indexed_count=current_indexed_count, + error_message="Time limit reached. Partial sync completed. Please run again for remaining items.", + is_warning=True, # Mark as warning since partial data was indexed + ) + await session.commit() + except Exception as notif_error: + logger.error(f"Failed to update notification on soft timeout: {notif_error!s}") + + # Re-raise so Celery knows the task was terminated + raise except Exception as e: logger.error(f"Error in indexing task: {e!s}", exc_info=True) @@ -1409,7 +1462,7 @@ async def _run_indexing_with_notifications( await NotificationService.connector_indexing.notify_indexing_completed( session=session, notification=notification, - indexed_count=0, + indexed_count=current_indexed_count, # Use tracked count, not 0 error_message=str(e), skipped_count=None, # Unknown on exception ) @@ -1439,6 +1492,7 @@ async def run_notion_indexing_with_new_session( indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, supports_retry_callback=True, # Notion connector supports retry notifications + supports_heartbeat_callback=True, # Notion connector supports heartbeat notifications ) @@ -1471,6 +1525,7 @@ async def run_notion_indexing( indexing_function=index_notion_pages, update_timestamp_func=_update_connector_timestamp_by_id, supports_retry_callback=True, # Notion connector supports retry notifications + supports_heartbeat_callback=True, # Notion connector supports heartbeat notifications ) @@ -1521,6 +1576,7 @@ async def run_github_indexing( end_date=end_date, indexing_function=index_github_repos, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1571,6 +1627,7 @@ async def run_linear_indexing( end_date=end_date, indexing_function=index_linear_issues, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1620,6 +1677,7 @@ async def run_discord_indexing( end_date=end_date, indexing_function=index_discord_messages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1670,6 +1728,7 @@ async def run_teams_indexing( end_date=end_date, indexing_function=index_teams_messages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1720,6 +1779,7 @@ async def run_jira_indexing( end_date=end_date, indexing_function=index_jira_issues, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1772,6 +1832,7 @@ async def run_confluence_indexing( end_date=end_date, indexing_function=index_confluence_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1822,6 +1883,7 @@ async def run_clickup_indexing( end_date=end_date, indexing_function=index_clickup_tasks, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1872,6 +1934,7 @@ async def run_airtable_indexing( end_date=end_date, indexing_function=index_airtable_records, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1924,6 +1987,7 @@ async def run_google_calendar_indexing( end_date=end_date, indexing_function=index_google_calendar_events, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -1998,6 +2062,7 @@ async def run_google_gmail_indexing( end_date=end_date, indexing_function=gmail_indexing_wrapper, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2206,6 +2271,7 @@ async def run_luma_indexing( end_date=end_date, indexing_function=index_luma_events, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2257,6 +2323,7 @@ async def run_elasticsearch_indexing( end_date=end_date, indexing_function=index_elasticsearch_documents, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2306,6 +2373,7 @@ async def run_web_page_indexing( end_date=end_date, indexing_function=index_crawled_urls, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2360,6 +2428,7 @@ async def run_bookstack_indexing( end_date=end_date, indexing_function=index_bookstack_pages, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2412,6 +2481,7 @@ async def run_obsidian_indexing( end_date=end_date, indexing_function=index_obsidian_vault, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) @@ -2465,6 +2535,7 @@ async def run_composio_indexing( end_date=end_date, indexing_function=index_composio_connector, update_timestamp_func=_update_connector_timestamp_by_id, + supports_heartbeat_callback=True, ) diff --git a/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py new file mode 100644 index 000000000..ff162f70f --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/stale_notification_cleanup_task.py @@ -0,0 +1,141 @@ +"""Celery task to detect and mark stale connector indexing notifications as failed. + +This task runs periodically (every 5 minutes by default) to find notifications +that are stuck in "in_progress" status but haven't received a heartbeat update +in the configured timeout period. These are marked as "failed" to prevent the +frontend from showing a perpetual "syncing" state. +""" + +import logging +from datetime import UTC, datetime, timedelta + +from sqlalchemy import and_, update +from sqlalchemy.dialects.postgresql import JSONB +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.future import select +from sqlalchemy.orm.attributes import flag_modified +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import Notification + +logger = logging.getLogger(__name__) + +# Timeout in minutes - notifications without heartbeat for this long are marked as failed +# Should be longer than HEARTBEAT_INTERVAL_SECONDS (30s) * a reasonable number of missed heartbeats +# 5 minutes = 10 missed heartbeats, which is a reasonable threshold +STALE_NOTIFICATION_TIMEOUT_MINUTES = 5 + + +def get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="cleanup_stale_indexing_notifications") +def cleanup_stale_indexing_notifications_task(): + """ + Check for stale connector indexing notifications and mark them as failed. + + This task finds notifications that: + - Have type = 'connector_indexing' + - Have metadata.status = 'in_progress' + - Have updated_at older than STALE_NOTIFICATION_TIMEOUT_MINUTES + + And marks them as failed with an appropriate error message. + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete(_cleanup_stale_notifications()) + finally: + loop.close() + + +async def _cleanup_stale_notifications(): + """Find and mark stale connector indexing notifications as failed.""" + async with get_celery_session_maker()() as session: + try: + # Calculate the cutoff time + cutoff_time = datetime.now(UTC) - timedelta( + minutes=STALE_NOTIFICATION_TIMEOUT_MINUTES + ) + + # Find stale notifications: + # - type = 'connector_indexing' + # - metadata->>'status' = 'in_progress' + # - updated_at < cutoff_time + result = await session.execute( + select(Notification).filter( + and_( + Notification.type == "connector_indexing", + Notification.notification_metadata["status"].astext == "in_progress", + Notification.updated_at < cutoff_time, + ) + ) + ) + stale_notifications = result.scalars().all() + + if not stale_notifications: + logger.debug("No stale connector indexing notifications found") + return + + logger.warning( + f"Found {len(stale_notifications)} stale connector indexing notifications " + f"(no heartbeat for >{STALE_NOTIFICATION_TIMEOUT_MINUTES} minutes)" + ) + + # Mark each stale notification as failed + for notification in stale_notifications: + try: + # Get current indexed count from metadata if available + indexed_count = notification.notification_metadata.get("indexed_count", 0) + connector_name = notification.notification_metadata.get("connector_name", "Unknown") + + # Calculate how long it's been stale + stale_duration = datetime.now(UTC) - notification.updated_at + stale_minutes = int(stale_duration.total_seconds() / 60) + + # Update notification metadata + notification.notification_metadata["status"] = "failed" + notification.notification_metadata["completed_at"] = datetime.now(UTC).isoformat() + notification.notification_metadata["error_message"] = ( + f"Indexing task appears to have crashed or timed out. " + f"No activity detected for {stale_minutes} minutes. " + f"Please try syncing again." + ) + + # Flag the JSONB column as modified for SQLAlchemy to detect the change + flag_modified(notification, "notification_metadata") + + logger.info( + f"Marking notification {notification.id} for connector '{connector_name}' as failed " + f"(stale for {stale_minutes} minutes, indexed {indexed_count} items before failure)" + ) + + except Exception as e: + logger.error( + f"Error marking notification {notification.id} as failed: {e!s}", + exc_info=True, + ) + continue + + # Commit all changes + await session.commit() + logger.info( + f"Successfully marked {len(stale_notifications)} stale notifications as failed" + ) + + except Exception as e: + logger.error(f"Error cleaning up stale notifications: {e!s}", exc_info=True) + await session.rollback() + diff --git a/surfsense_backend/app/tasks/composio_indexer.py b/surfsense_backend/app/tasks/composio_indexer.py index ffc4a1f27..49764fd98 100644 --- a/surfsense_backend/app/tasks/composio_indexer.py +++ b/surfsense_backend/app/tasks/composio_indexer.py @@ -9,8 +9,12 @@ to avoid circular import issues with the connector_indexers package. """ import logging +from collections.abc import Awaitable, Callable from importlib import import_module +# Type alias for heartbeat callback function +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession from sqlalchemy.future import select @@ -86,6 +90,7 @@ async def index_composio_connector( end_date: str | None = None, update_last_indexed: bool = True, max_items: int = 1000, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int, str | None]: """ Index content from a Composio connector. @@ -102,6 +107,7 @@ async def index_composio_connector( end_date: End date for filtering (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp max_items: Maximum number of items to fetch + on_heartbeat_callback: Optional callback to report progress for heartbeat updates Returns: Tuple of (number_of_indexed_items, number_of_skipped_items, error_message or None) @@ -180,6 +186,7 @@ async def index_composio_connector( "log_entry": log_entry, "update_last_indexed": update_last_indexed, "max_items": max_items, + "on_heartbeat_callback": on_heartbeat_callback, } # Add date params for toolkits that support them diff --git a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py index 6bb62d716..ab9e5d678 100644 --- a/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/airtable_indexer.py @@ -2,6 +2,9 @@ Airtable connector indexer. """ +import time +from collections.abc import Awaitable, Callable + from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -17,6 +20,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -37,6 +46,7 @@ async def index_airtable_records( end_date: str | None = None, max_records: int = 2500, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Airtable records for a given connector. @@ -50,6 +60,7 @@ async def index_airtable_records( end_date: End date for filtering records (YYYY-MM-DD) max_records: Maximum number of records to fetch per table update_last_indexed: Whether to update the last_indexed_at timestamp + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple of (number_of_documents_processed, error_message) @@ -127,8 +138,16 @@ async def index_airtable_records( logger.info(f"Found {len(bases)} Airtable bases to process") + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + total_documents_indexed = 0 + # Process each base for base in bases: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(total_documents_indexed) + last_heartbeat_time = time.time() base_id = base.get("id") base_name = base.get("name", "Unknown Base") @@ -204,6 +223,11 @@ async def index_airtable_records( documents_skipped = 0 # Process each record for record in records: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(total_documents_indexed) + last_heartbeat_time = time.time() + try: # Generate markdown content markdown_content = ( diff --git a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py index e183ab333..90232809c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/bookstack_indexer.py @@ -2,6 +2,8 @@ BookStack connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -19,6 +21,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -38,6 +46,7 @@ async def index_bookstack_pages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index BookStack pages. @@ -50,6 +59,7 @@ async def index_bookstack_pages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -179,7 +189,14 @@ async def index_bookstack_pages( skipped_pages = [] documents_skipped = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for page in pages: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: page_id = page.get("id") page_name = page.get("name", "") diff --git a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py index 887c3e2e5..2b95b6a11 100644 --- a/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/clickup_indexer.py @@ -3,6 +3,8 @@ ClickUp connector indexer. """ import contextlib +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -20,6 +22,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -38,6 +46,7 @@ async def index_clickup_tasks( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index tasks from ClickUp workspace. @@ -50,6 +59,7 @@ async def index_clickup_tasks( start_date: Start date for filtering tasks (YYYY-MM-DD format) end_date: End date for filtering tasks (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple of (number of indexed tasks, error message if any) @@ -132,6 +142,9 @@ async def index_clickup_tasks( documents_indexed = 0 documents_skipped = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + # Iterate workspaces and fetch tasks for workspace in workspaces: workspace_id = workspace.get("id") @@ -170,6 +183,11 @@ async def index_clickup_tasks( ) for task in tasks: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() + try: task_id = task.get("id") task_name = task.get("name", "Untitled Task") diff --git a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py index 5673839bb..078aacf86 100644 --- a/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/confluence_indexer.py @@ -3,6 +3,8 @@ Confluence connector indexer. """ import contextlib +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -20,6 +22,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -39,6 +47,7 @@ async def index_confluence_pages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Confluence pages and comments. @@ -51,6 +60,7 @@ async def index_confluence_pages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -175,7 +185,14 @@ async def index_confluence_pages( skipped_pages = [] documents_skipped = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for page in pages: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: page_id = page.get("id") page_title = page.get("title", "") diff --git a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py index 9e401b335..4bbeff125 100644 --- a/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/discord_indexer.py @@ -3,6 +3,8 @@ Discord connector indexer. """ import asyncio +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime, timedelta from sqlalchemy.exc import SQLAlchemyError @@ -28,6 +30,12 @@ from .base import ( update_connector_last_indexed, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + async def index_discord_messages( session: AsyncSession, @@ -37,6 +45,7 @@ async def index_discord_messages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Discord messages from all accessible channels. @@ -49,6 +58,8 @@ async def index_discord_messages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. + Called periodically with (indexed_count) to prevent task appearing stuck. Returns: Tuple containing (number of documents indexed, error message or None) @@ -281,6 +292,9 @@ async def index_discord_messages( documents_skipped = 0 skipped_channels: list[str] = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + # Process each guild and channel await task_logger.log_task_progress( log_entry, @@ -290,6 +304,10 @@ async def index_discord_messages( try: for guild in guilds: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() guild_id = guild["id"] guild_name = guild["name"] logger.info(f"Processing guild: {guild_name} ({guild_id})") diff --git a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py index 6a18af83b..49d82df0e 100644 --- a/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/elasticsearch_indexer.py @@ -4,6 +4,8 @@ Elasticsearch indexer for SurfSense import json import logging +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from typing import Any @@ -19,6 +21,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -36,6 +44,7 @@ async def index_elasticsearch_documents( start_date: str, end_date: str, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index documents from Elasticsearch into SurfSense @@ -48,6 +57,7 @@ async def index_elasticsearch_documents( start_date: Start date for indexing (not used for Elasticsearch, kept for compatibility) end_date: End date for indexing (not used for Elasticsearch, kept for compatibility) update_last_indexed: Whether to update the last indexed timestamp + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple of (number of documents processed, error message if any) @@ -155,6 +165,9 @@ async def index_elasticsearch_documents( documents_processed = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + try: await task_logger.log_task_progress( log_entry, @@ -172,6 +185,11 @@ async def index_elasticsearch_documents( size=min(max_documents, 100), # Scroll in batches fields=config.get("ELASTICSEARCH_FIELDS"), ): + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_processed) + last_heartbeat_time = time.time() + if documents_processed >= max_documents: break diff --git a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py index fb6989bb9..75e7f516c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/github_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/github_indexer.py @@ -5,6 +5,8 @@ This indexer processes entire repository digests in one pass, dramatically reducing LLM API calls compared to the previous file-by-file approach. """ +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from sqlalchemy.exc import SQLAlchemyError @@ -22,6 +24,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -43,6 +51,7 @@ async def index_github_repos( start_date: str | None = None, # Ignored - GitHub indexes full repo snapshots end_date: str | None = None, # Ignored - GitHub indexes full repo snapshots update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index GitHub repositories using gitingest for efficient processing. @@ -62,6 +71,7 @@ async def index_github_repos( start_date: Ignored - kept for API compatibility end_date: Ignored - kept for API compatibility update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -168,7 +178,15 @@ async def index_github_repos( f"Starting gitingest indexing for {len(repo_full_names_to_index)} repositories." ) + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + documents_indexed = 0 + for repo_full_name in repo_full_names_to_index: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() if not repo_full_name or not isinstance(repo_full_name, str): logger.warning(f"Skipping invalid repository entry: {repo_full_name}") continue diff --git a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py index 81d33b5e2..cef2e15f1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_calendar_indexer.py @@ -2,6 +2,8 @@ Google Calendar connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime, timedelta import pytz @@ -21,6 +23,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -39,6 +47,7 @@ async def index_google_calendar_events( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Google Calendar events. @@ -52,6 +61,7 @@ async def index_google_calendar_events( end_date: End date for indexing (YYYY-MM-DD format). Can be in the future to index upcoming events. Defaults to today if not provided. update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -281,7 +291,14 @@ async def index_google_calendar_events( 0 # Track events skipped due to duplicate content_hash ) + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for event in events: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: event_id = event.get("id") event_summary = event.get("summary", "No Title") diff --git a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py index f50e149d3..98df68cd1 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_drive_indexer.py @@ -1,6 +1,8 @@ """Google Drive indexer using Surfsense file processors.""" import logging +import time +from collections.abc import Awaitable, Callable from sqlalchemy.exc import SQLAlchemyError from sqlalchemy.ext.asyncio import AsyncSession @@ -24,6 +26,12 @@ from app.tasks.connector_indexers.base import ( ) from app.utils.document_converters import generate_unique_identifier_hash +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + logger = logging.getLogger(__name__) @@ -38,6 +46,7 @@ async def index_google_drive_files( update_last_indexed: bool = True, max_files: int = 500, include_subfolders: bool = False, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Google Drive files for a specific connector. @@ -53,6 +62,7 @@ async def index_google_drive_files( update_last_indexed: Whether to update last_indexed_at timestamp max_files: Maximum number of files to index include_subfolders: Whether to recursively index files in subfolders + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple of (number_of_indexed_files, error_message) @@ -147,6 +157,7 @@ async def index_google_drive_files( log_entry=log_entry, max_files=max_files, include_subfolders=include_subfolders, + on_heartbeat_callback=on_heartbeat_callback, ) else: logger.info(f"Using full scan for connector {connector_id}") @@ -163,6 +174,7 @@ async def index_google_drive_files( log_entry=log_entry, max_files=max_files, include_subfolders=include_subfolders, + on_heartbeat_callback=on_heartbeat_callback, ) documents_indexed, documents_skipped = result @@ -383,6 +395,7 @@ async def _index_full_scan( log_entry: any, max_files: int, include_subfolders: bool = False, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int]: """Perform full scan indexing of a folder.""" await task_logger.log_task_progress( @@ -399,10 +412,17 @@ async def _index_full_scan( documents_skipped = 0 files_processed = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + # Queue of folders to process: (folder_id, folder_name) folders_to_process = [(folder_id, folder_name)] while folders_to_process and files_processed < max_files: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() current_folder_id, current_folder_name = folders_to_process.pop(0) logger.info(f"Processing folder: {current_folder_name} ({current_folder_id})") page_token = None @@ -485,6 +505,7 @@ async def _index_with_delta_sync( log_entry: any, max_files: int, include_subfolders: bool = False, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, int]: """Perform delta sync indexing using change tracking. @@ -515,7 +536,14 @@ async def _index_with_delta_sync( documents_skipped = 0 files_processed = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for change in changes: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() if files_processed >= max_files: break diff --git a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py index e599abd22..34e5a9530 100644 --- a/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/google_gmail_indexer.py @@ -2,6 +2,8 @@ Google Gmail connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime from google.oauth2.credentials import Credentials @@ -23,6 +25,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -43,6 +51,7 @@ async def index_google_gmail_messages( end_date: str | None = None, update_last_indexed: bool = True, max_messages: int = 1000, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str]: """ Index Gmail messages for a specific connector. @@ -56,6 +65,7 @@ async def index_google_gmail_messages( end_date: End date for filtering messages (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) max_messages: Maximum number of messages to fetch (default: 100) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple of (number_of_indexed_messages, status_message) @@ -212,7 +222,15 @@ async def index_google_gmail_messages( documents_indexed = 0 skipped_messages = [] documents_skipped = 0 + + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for message in messages: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: # Extract message information message_id = message.get("id", "") diff --git a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py index d6095d20e..ab36ae7d0 100644 --- a/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/jira_indexer.py @@ -3,6 +3,8 @@ Jira connector indexer. """ import contextlib +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -20,6 +22,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -39,6 +47,7 @@ async def index_jira_issues( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Jira issues and comments. @@ -51,6 +60,7 @@ async def index_jira_issues( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -169,7 +179,14 @@ async def index_jira_issues( skipped_issues = [] documents_skipped = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for issue in issues: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: issue_id = issue.get("key") issue_identifier = issue.get("key", "") diff --git a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py index d00a39160..549aa0224 100644 --- a/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/linear_indexer.py @@ -2,6 +2,8 @@ Linear connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -19,6 +21,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( calculate_date_range, check_document_by_unique_identifier, @@ -38,6 +46,7 @@ async def index_linear_issues( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Linear issues and comments. @@ -50,6 +59,7 @@ async def index_linear_issues( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -188,6 +198,9 @@ async def index_linear_issues( documents_skipped = 0 skipped_issues = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + await task_logger.log_task_progress( log_entry, f"Starting to process {len(issues)} Linear issues", @@ -196,6 +209,11 @@ async def index_linear_issues( # Process each issue for issue in issues: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() + try: issue_id = issue.get("id", "") issue_identifier = issue.get("identifier", "") diff --git a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py index 59890dbe4..22fd6d468 100644 --- a/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/luma_indexer.py @@ -2,6 +2,8 @@ Luma connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime, timedelta from sqlalchemy.exc import SQLAlchemyError @@ -19,6 +21,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -37,6 +45,7 @@ async def index_luma_events( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Luma events. @@ -50,6 +59,7 @@ async def index_luma_events( end_date: End date for indexing (YYYY-MM-DD format). Can be in the future to index upcoming events. Defaults to today if not provided. update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -221,7 +231,14 @@ async def index_luma_events( documents_skipped = 0 skipped_events = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for event in events: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: # Luma event structure fields - events have nested 'event' field event_data = event.get("event", {}) diff --git a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py index a65bf84a7..88779db57 100644 --- a/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/notion_indexer.py @@ -2,6 +2,7 @@ Notion connector indexer. """ +import time from collections.abc import Awaitable, Callable from datetime import datetime @@ -34,6 +35,13 @@ from .base import ( # Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) -> None RetryCallbackType = Callable[[str, int, int, float], Awaitable[None]] +# Type alias for heartbeat callback +# Signature: async callback(indexed_count) -> None +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + async def index_notion_pages( session: AsyncSession, @@ -44,6 +52,7 @@ async def index_notion_pages( end_date: str | None = None, update_last_indexed: bool = True, on_retry_callback: RetryCallbackType | None = None, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Notion pages from all accessible pages. @@ -59,6 +68,8 @@ async def index_notion_pages( on_retry_callback: Optional callback for retry progress notifications. Signature: async callback(retry_reason, attempt, max_attempts, wait_seconds) retry_reason is one of: 'rate_limit', 'server_error', 'timeout' + on_heartbeat_callback: Optional callback to update notification during long-running indexing. + Called periodically with (indexed_count) to prevent task appearing stuck. Returns: Tuple containing (number of documents indexed, error message or None) @@ -211,6 +222,9 @@ async def index_notion_pages( documents_skipped = 0 skipped_pages = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + await task_logger.log_task_progress( log_entry, f"Starting to process {len(pages)} Notion pages", @@ -219,6 +233,11 @@ async def index_notion_pages( # Process each page for page in pages: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() + try: page_id = page.get("page_id") page_title = page.get("title", f"Untitled page ({page_id})") diff --git a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py index a603d3fba..48fa5f0d3 100644 --- a/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/obsidian_indexer.py @@ -7,6 +7,8 @@ This connector is only available in self-hosted mode. import os import re +import time +from collections.abc import Awaitable, Callable from datetime import UTC, datetime from pathlib import Path @@ -25,6 +27,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( build_document_metadata_string, check_document_by_unique_identifier, @@ -152,6 +160,7 @@ async def index_obsidian_vault( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index notes from a local Obsidian vault. @@ -167,6 +176,7 @@ async def index_obsidian_vault( start_date: Start date for filtering (YYYY-MM-DD format) - optional end_date: End date for filtering (YYYY-MM-DD format) - optional update_last_indexed: Whether to update the last_indexed_at timestamp + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -305,7 +315,14 @@ async def index_obsidian_vault( indexed_count = 0 skipped_count = 0 + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for file_info in files: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(indexed_count) + last_heartbeat_time = time.time() try: file_path = file_info["path"] relative_path = file_info["relative_path"] diff --git a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py index f244c97f8..4ac87164c 100644 --- a/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/slack_indexer.py @@ -2,6 +2,8 @@ Slack connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime from slack_sdk.errors import SlackApiError @@ -18,6 +20,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( build_document_metadata_markdown, calculate_date_range, @@ -38,6 +46,7 @@ async def index_slack_messages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Slack messages from all accessible channels. @@ -50,6 +59,8 @@ async def index_slack_messages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. + Called periodically with (indexed_count) to prevent task appearing stuck. Returns: Tuple containing (number of documents indexed, error message or None) @@ -164,6 +175,9 @@ async def index_slack_messages( documents_skipped = 0 skipped_channels = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + await task_logger.log_task_progress( log_entry, f"Starting to process {len(channels)} Slack channels", @@ -172,6 +186,10 @@ async def index_slack_messages( # Process each channel for channel_obj in channels: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() channel_id = channel_obj["id"] channel_name = channel_obj["name"] is_private = channel_obj["is_private"] diff --git a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py index 66b709ddc..55bb02ab9 100644 --- a/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/teams_indexer.py @@ -2,6 +2,8 @@ Microsoft Teams connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import UTC from sqlalchemy.exc import SQLAlchemyError @@ -17,6 +19,12 @@ from app.utils.document_converters import ( generate_unique_identifier_hash, ) +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds - update notification every 30 seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( build_document_metadata_markdown, calculate_date_range, @@ -37,6 +45,7 @@ async def index_teams_messages( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index Microsoft Teams messages from all accessible teams and channels. @@ -49,6 +58,8 @@ async def index_teams_messages( start_date: Start date for indexing (YYYY-MM-DD format) end_date: End date for indexing (YYYY-MM-DD format) update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. + Called periodically with (indexed_count) to prevent task appearing stuck. Returns: Tuple containing (number of documents indexed, error message or None) @@ -161,6 +172,9 @@ async def index_teams_messages( documents_skipped = 0 skipped_channels = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + await task_logger.log_task_progress( log_entry, f"Starting to process {len(teams)} Teams", @@ -185,6 +199,11 @@ async def index_teams_messages( # Process each team for team in teams: + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() + team_id = team.get("id") team_name = team.get("displayName", "Unknown Team") diff --git a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py index 0c63fd2f0..ae89b7513 100644 --- a/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py +++ b/surfsense_backend/app/tasks/connector_indexers/webcrawler_indexer.py @@ -2,6 +2,8 @@ Webcrawler connector indexer. """ +import time +from collections.abc import Awaitable, Callable from datetime import datetime from sqlalchemy.exc import SQLAlchemyError @@ -20,6 +22,12 @@ from app.utils.document_converters import ( ) from app.utils.webcrawler_utils import parse_webcrawler_urls +# Type hint for heartbeat callback +HeartbeatCallbackType = Callable[[int], Awaitable[None]] + +# Heartbeat interval in seconds +HEARTBEAT_INTERVAL_SECONDS = 30 + from .base import ( check_document_by_unique_identifier, check_duplicate_document_by_hash, @@ -38,6 +46,7 @@ async def index_crawled_urls( start_date: str | None = None, end_date: str | None = None, update_last_indexed: bool = True, + on_heartbeat_callback: HeartbeatCallbackType | None = None, ) -> tuple[int, str | None]: """ Index web page URLs. @@ -50,6 +59,7 @@ async def index_crawled_urls( start_date: Start date for filtering (YYYY-MM-DD format) - optional end_date: End date for filtering (YYYY-MM-DD format) - optional update_last_indexed: Whether to update the last_indexed_at timestamp (default: True) + on_heartbeat_callback: Optional callback to update notification during long-running indexing. Returns: Tuple containing (number of documents indexed, error message or None) @@ -140,7 +150,14 @@ async def index_crawled_urls( documents_skipped = 0 failed_urls = [] + # Heartbeat tracking - update notification periodically to prevent appearing stuck + last_heartbeat_time = time.time() + for idx, url in enumerate(urls, 1): + # Check if it's time for a heartbeat update + if on_heartbeat_callback and (time.time() - last_heartbeat_time) >= HEARTBEAT_INTERVAL_SECONDS: + await on_heartbeat_callback(documents_indexed) + last_heartbeat_time = time.time() try: logger.info(f"Processing URL {idx}/{len(urls)}: {url}") diff --git a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts index 19741e020..5783540d8 100644 --- a/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts +++ b/surfsense_web/components/assistant-ui/connector-popup/hooks/use-indexing-connectors.ts @@ -5,6 +5,44 @@ import type { SearchSourceConnector } from "@/contracts/types/connector.types"; import type { InboxItem } from "@/contracts/types/inbox.types"; import { isConnectorIndexingMetadata } from "@/contracts/types/inbox.types"; +/** + * Timeout thresholds for stuck task detection + * + * These align with the backend Celery configuration: + * - HARD_TIMEOUT: 8 hours (task_time_limit=28800 in Celery) + * Any task running longer than this is definitely dead. + * + * - STALE_THRESHOLD: 15 minutes without notification updates + * If heartbeats are being sent every 30s, missing 15+ minutes of updates + * indicates the task has likely crashed or the worker is down. + */ +const HARD_TIMEOUT_MS = 8 * 60 * 60 * 1000; // 8 hours in milliseconds +const STALE_THRESHOLD_MS = 15 * 60 * 1000; // 15 minutes in milliseconds + +/** + * Check if a notification is stale (no updates for too long) + * @param updatedAt - ISO timestamp of last notification update + * @returns true if the notification hasn't been updated recently + */ +function isNotificationStale(updatedAt: string | null | undefined): boolean { + if (!updatedAt) return false; + const lastUpdate = new Date(updatedAt).getTime(); + const now = Date.now(); + return now - lastUpdate > STALE_THRESHOLD_MS; +} + +/** + * Check if a task has exceeded the hard timeout (definitely dead) + * @param startedAt - ISO timestamp when the task started + * @returns true if the task has been running longer than the hard limit + */ +function isTaskTimedOut(startedAt: string | null | undefined): boolean { + if (!startedAt) return false; + const startTime = new Date(startedAt).getTime(); + const now = Date.now(); + return now - startTime > HARD_TIMEOUT_MS; +} + /** * Hook to track which connectors are currently indexing using local state. * @@ -13,6 +51,8 @@ import { isConnectorIndexingMetadata } from "@/contracts/types/inbox.types"; * 2. Detecting in_progress notifications from Electric SQL to restore state after remounts * 3. Clearing indexing state when notifications become completed or failed * 4. Clearing indexing state when Electric SQL detects last_indexed_at changed + * 5. Detecting stale/stuck tasks that haven't updated in 15+ minutes + * 6. Detecting hard timeout (8h) - tasks that definitely cannot still be running * * The actual `last_indexed_at` value comes from Electric SQL/PGlite, not local state. */ @@ -57,6 +97,7 @@ export function useIndexingConnectors( // Detect notification status changes and update indexing state accordingly // This restores spinner state after component remounts and handles all status transitions + // Also detects stale/stuck tasks that haven't been updated in a while useEffect(() => { if (!inboxItems || inboxItems.length === 0) return; @@ -71,11 +112,26 @@ export function useIndexingConnectors( const metadata = isConnectorIndexingMetadata(item.metadata) ? item.metadata : null; if (!metadata) continue; - // If status is "in_progress", add connector to indexing set + // If status is "in_progress", check if it's actually still running if (metadata.status === "in_progress") { - if (!newIndexingIds.has(metadata.connector_id)) { - newIndexingIds.add(metadata.connector_id); - hasChanges = true; + // Check for hard timeout (8h) - task is definitely dead + const timedOut = isTaskTimedOut(metadata.started_at); + + // Check for stale notification (15min without updates) - task likely crashed + const stale = isNotificationStale(item.updated_at); + + if (timedOut || stale) { + // Task is stuck - don't show as indexing + if (newIndexingIds.has(metadata.connector_id)) { + newIndexingIds.delete(metadata.connector_id); + hasChanges = true; + } + } else { + // Task appears to be genuinely running + if (!newIndexingIds.has(metadata.connector_id)) { + newIndexingIds.add(metadata.connector_id); + hasChanges = true; + } } } // If status is "completed" or "failed", remove connector from indexing set diff --git a/surfsense_web/lib/electric/client.ts b/surfsense_web/lib/electric/client.ts index d25e268be..177a66d28 100644 --- a/surfsense_web/lib/electric/client.ts +++ b/surfsense_web/lib/electric/client.ts @@ -55,7 +55,8 @@ const pendingSyncs = new Map>(); // Version for sync state - increment this to force fresh sync when Electric config changes // v2: user-specific database architecture // v3: consistent cutoff date for sync+queries, visibility refresh support -const SYNC_VERSION = 3; +// v4: heartbeat-based stale notification detection with updated_at tracking +const SYNC_VERSION = 4; // Database name prefix for identifying SurfSense databases const DB_PREFIX = "surfsense-";