diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 3ddfdd652..289b45e2f 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -3,6 +3,7 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense #Celery Config CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/0 +# Dynamic Periodic task creation uv run celery -A celery_worker.celery_app beat --loglevel=info SECRET_KEY=SECRET NEXT_FRONTEND_URL=http://localhost:3000 diff --git a/surfsense_backend/.gitignore b/surfsense_backend/.gitignore index acb5f8cd9..e9b62fbbd 100644 --- a/surfsense_backend/.gitignore +++ b/surfsense_backend/.gitignore @@ -6,4 +6,8 @@ __pycache__/ .flashrank_cache surf_new_backend.egg-info/ podcasts/ -temp_audio/ \ No newline at end of file +temp_audio/ +celerybeat-schedule* +celerybeat-schedule.* +celerybeat-schedule.dir +celerybeat-schedule.bak diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 109c8b7fc..6cbb5c901 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -3,6 +3,7 @@ import os from celery import Celery +from celery.schedules import crontab from dotenv import load_dotenv # Load environment variables @@ -21,6 +22,7 @@ celery_app = Celery( "app.tasks.celery_tasks.document_tasks", "app.tasks.celery_tasks.podcast_tasks", "app.tasks.celery_tasks.connector_tasks", + "app.tasks.celery_tasks.schedule_checker_task", ], ) @@ -47,13 +49,20 @@ celery_app.conf.update( task_reject_on_worker_lost=True, # Broker settings broker_connection_retry_on_startup=True, + # Beat scheduler settings + beat_max_loop_interval=60, # Check every minute ) -# Optional: Configure Celery Beat for periodic tasks +# Configure Celery Beat schedule +# This uses a meta-scheduler pattern: instead of creating individual Beat schedules +# for each connector, we have ONE schedule that checks the database every minute +# for connectors that need indexing. This provides dynamic scheduling without restarts. celery_app.conf.beat_schedule = { - # Example: Add periodic tasks here if needed - # "periodic-task-name": { - # "task": "app.tasks.celery_tasks.some_task", - # "schedule": crontab(minute=0, hour=0), # Run daily at midnight - # }, + "check-periodic-connector-schedules": { + "task": "check_periodic_schedules", + "schedule": crontab(minute="*"), # Run every minute + "options": { + "expires": 30, # Task expires after 30 seconds if not picked up + }, + }, } diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index 4a6774113..ab3d1c3c2 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -52,6 +52,11 @@ from app.tasks.connector_indexers import ( ) from app.users import current_active_user from app.utils.check_ownership import check_ownership +from app.utils.periodic_scheduler import ( + create_periodic_schedule, + delete_periodic_schedule, + update_periodic_schedule, +) # Set up logging logger = logging.getLogger(__name__) @@ -144,6 +149,21 @@ async def create_search_source_connector( session.add(db_connector) await session.commit() await session.refresh(db_connector) + + # Create periodic schedule if periodic indexing is enabled + if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: + success = create_periodic_schedule( + connector_id=db_connector.id, + search_space_id=search_space_id, + user_id=str(user.id), + connector_type=db_connector.connector_type, + frequency_minutes=db_connector.indexing_frequency_minutes, + ) + if not success: + logger.warning( + f"Failed to create periodic schedule for connector {db_connector.id}" + ) + return db_connector except ValidationError as e: await session.rollback() @@ -348,6 +368,30 @@ async def update_search_source_connector( try: await session.commit() await session.refresh(db_connector) + + # Handle periodic schedule updates + if "periodic_indexing_enabled" in update_data or "indexing_frequency_minutes" in update_data: + if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: + # Create or update the periodic schedule + success = update_periodic_schedule( + connector_id=db_connector.id, + search_space_id=db_connector.search_space_id, + user_id=str(user.id), + connector_type=db_connector.connector_type, + frequency_minutes=db_connector.indexing_frequency_minutes, + ) + if not success: + logger.warning( + f"Failed to update periodic schedule for connector {db_connector.id}" + ) + else: + # Delete the periodic schedule if disabled + success = delete_periodic_schedule(db_connector.id) + if not success: + logger.warning( + f"Failed to delete periodic schedule for connector {db_connector.id}" + ) + return db_connector except IntegrityError as e: await session.rollback() @@ -378,6 +422,15 @@ async def delete_search_source_connector( db_connector = await check_ownership( session, SearchSourceConnector, connector_id, user ) + + # Delete any periodic schedule associated with this connector + if db_connector.periodic_indexing_enabled: + success = delete_periodic_schedule(connector_id) + if not success: + logger.warning( + f"Failed to delete periodic schedule for connector {connector_id}" + ) + await session.delete(db_connector) await session.commit() return {"message": "Search source connector deleted successfully"} diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py new file mode 100644 index 000000000..609b4fdd7 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -0,0 +1,130 @@ +"""Meta-scheduler task that checks for connectors needing periodic indexing.""" + +import logging +from datetime import UTC, datetime + +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.future import select +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import SearchSourceConnector, SearchSourceConnectorType + +logger = logging.getLogger(__name__) + + +def get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="check_periodic_schedules") +def check_periodic_schedules_task(): + """ + Check all connectors for periodic indexing that's due. + This task runs every minute and triggers indexing for any connector + whose next_scheduled_at time has passed. + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete(_check_and_trigger_schedules()) + finally: + loop.close() + + +async def _check_and_trigger_schedules(): + """Check database for connectors that need indexing and trigger their tasks.""" + async with get_celery_session_maker()() as session: + try: + # Find all connectors with periodic indexing enabled that are due + now = datetime.now(UTC) + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.periodic_indexing_enabled == True, # noqa: E712 + SearchSourceConnector.next_scheduled_at <= now, + ) + ) + due_connectors = result.scalars().all() + + if not due_connectors: + logger.debug("No connectors due for periodic indexing") + return + + logger.info(f"Found {len(due_connectors)} connectors due for indexing") + + # Import all indexing tasks + from app.tasks.celery_tasks.connector_tasks import ( + index_airtable_records_task, + index_clickup_tasks_task, + index_confluence_pages_task, + index_discord_messages_task, + index_elasticsearch_documents_task, + index_github_repos_task, + index_google_calendar_events_task, + index_google_gmail_messages_task, + index_jira_issues_task, + index_linear_issues_task, + index_luma_events_task, + index_notion_pages_task, + index_slack_messages_task, + ) + + # Map connector types to their tasks + task_map = { + SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, + SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task, + SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task, + SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task, + SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task, + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task, + SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task, + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task, + SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task, + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task, + SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, + SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + } + + # Trigger indexing for each due connector + for connector in due_connectors: + task = task_map.get(connector.connector_type) + if task: + logger.info( + f"Triggering periodic indexing for connector {connector.id} " + f"({connector.connector_type.value})" + ) + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date - uses last_indexed_at + None, # end_date - uses now + ) + + # Update next_scheduled_at for next run + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + else: + logger.warning( + f"No task found for connector type {connector.connector_type}" + ) + + except Exception as e: + logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True) + await session.rollback() + diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py new file mode 100644 index 000000000..5aae96854 --- /dev/null +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -0,0 +1,172 @@ +""" +Utility functions for managing periodic connector indexing schedules. + +This module uses a meta-scheduler pattern instead of RedBeat's dynamic schedule creation. +Instead of creating individual Beat schedules for each connector, we: +1. Store schedule configuration in the database (next_scheduled_at, frequency) +2. Have ONE Beat task that runs every minute checking for due connectors +3. Trigger indexing tasks for connectors whose next_scheduled_at has passed + +This avoids RedBeat's limitation where new schedules aren't discovered without restart. +""" + +import logging + +from app.db import SearchSourceConnectorType + +logger = logging.getLogger(__name__) + +# Mapping of connector types to their corresponding Celery task names +CONNECTOR_TASK_MAP = { + SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages", + SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages", + SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos", + SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues", + SearchSourceConnectorType.JIRA_CONNECTOR: "index_jira_issues", + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "index_confluence_pages", + SearchSourceConnectorType.CLICKUP_CONNECTOR: "index_clickup_tasks", + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "index_google_calendar_events", + SearchSourceConnectorType.AIRTABLE_CONNECTOR: "index_airtable_records", + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: "index_google_gmail_messages", + SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages", + SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events", + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents", +} + + +def create_periodic_schedule( + connector_id: int, + search_space_id: int, + user_id: str, + connector_type: SearchSourceConnectorType, + frequency_minutes: int, +) -> bool: + """ + Trigger the first indexing run immediately when periodic indexing is enabled. + + Note: The periodic schedule is managed by the database (next_scheduled_at field) + and checked by the meta-scheduler task that runs every minute. + This function just triggers the first run for immediate feedback. + + Args: + connector_id: ID of the connector + search_space_id: ID of the search space + user_id: User ID + connector_type: Type of connector + frequency_minutes: Frequency in minutes (used for logging) + + Returns: + True if successful, False otherwise + """ + try: + logger.info( + f"Periodic indexing enabled for connector {connector_id} " + f"(frequency: {frequency_minutes} minutes). Triggering first run..." + ) + + # Import all indexing tasks + from app.tasks.celery_tasks.connector_tasks import ( + index_airtable_records_task, + index_clickup_tasks_task, + index_confluence_pages_task, + index_discord_messages_task, + index_elasticsearch_documents_task, + index_github_repos_task, + index_google_calendar_events_task, + index_google_gmail_messages_task, + index_jira_issues_task, + index_linear_issues_task, + index_luma_events_task, + index_notion_pages_task, + index_slack_messages_task, + ) + + # Map connector type to task + task_map = { + SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, + SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task, + SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task, + SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task, + SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task, + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task, + SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task, + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task, + SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task, + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task, + SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, + SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + } + + # Trigger the first run immediately + task = task_map.get(connector_type) + if task: + task.delay(connector_id, search_space_id, user_id, None, None) + logger.info( + f"✓ First indexing run triggered for connector {connector_id}. " + f"Periodic indexing will continue automatically every {frequency_minutes} minutes." + ) + else: + logger.error(f"No task mapping found for connector type: {connector_type}") + return False + + return True + + except Exception as e: + logger.error( + f"Failed to trigger initial indexing for connector {connector_id}: {e!s}", + exc_info=True, + ) + return False + + +def delete_periodic_schedule(connector_id: int) -> bool: + """ + Handle deletion of periodic schedule for a connector. + + Note: With the meta-scheduler pattern, the schedule is managed in the database. + The next_scheduled_at field being set to None effectively disables it. + This function just logs the action. + + Args: + connector_id: ID of the connector + + Returns: + True (always successful since database handles the state) + """ + logger.info(f"Periodic indexing disabled for connector {connector_id}") + return True + + +def update_periodic_schedule( + connector_id: int, + search_space_id: int, + user_id: str, + connector_type: SearchSourceConnectorType, + frequency_minutes: int, +) -> bool: + """ + Update an existing periodic schedule for a connector. + + Note: With the meta-scheduler pattern, updates are handled by the database. + This function logs the update and optionally triggers an immediate run. + + Args: + connector_id: ID of the connector + search_space_id: ID of the search space + user_id: User ID + connector_type: Type of connector + frequency_minutes: New frequency in minutes + + Returns: + True if successful, False otherwise + """ + logger.info( + f"Periodic indexing schedule updated for connector {connector_id} " + f"(new frequency: {frequency_minutes} minutes)" + ) + # Optionally trigger an immediate run with the new schedule + # Uncomment the line below if you want immediate execution on schedule update + # return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes) + return True +