From 8706df67166a6909eca6464c38635af0f1e5e3c8 Mon Sep 17 00:00:00 2001 From: "MSI\\ModSetter" Date: Wed, 22 Oct 2025 19:49:44 -0700 Subject: [PATCH] feat: added meta-scheduler for running periodic tasks --- .../routes/search_source_connectors_routes.py | 19 ++++++++++++++----- .../celery_tasks/schedule_checker_task.py | 1 - .../app/utils/periodic_scheduler.py | 13 ++++++------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index ab3d1c3c2..bd24efd49 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -151,7 +151,10 @@ async def create_search_source_connector( await session.refresh(db_connector) # Create periodic schedule if periodic indexing is enabled - if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: + if ( + db_connector.periodic_indexing_enabled + and db_connector.indexing_frequency_minutes + ): success = create_periodic_schedule( connector_id=db_connector.id, search_space_id=search_space_id, @@ -370,8 +373,14 @@ async def update_search_source_connector( await session.refresh(db_connector) # Handle periodic schedule updates - if "periodic_indexing_enabled" in update_data or "indexing_frequency_minutes" in update_data: - if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: + if ( + "periodic_indexing_enabled" in update_data + or "indexing_frequency_minutes" in update_data + ): + if ( + db_connector.periodic_indexing_enabled + and db_connector.indexing_frequency_minutes + ): # Create or update the periodic schedule success = update_periodic_schedule( connector_id=db_connector.id, @@ -422,7 +431,7 @@ async def delete_search_source_connector( db_connector = await check_ownership( session, SearchSourceConnector, connector_id, user ) - + # Delete any periodic schedule associated with this connector if db_connector.periodic_indexing_enabled: success = delete_periodic_schedule(connector_id) @@ -430,7 +439,7 @@ async def delete_search_source_connector( logger.warning( f"Failed to delete periodic schedule for connector {connector_id}" ) - + await session.delete(db_connector) await session.commit() return {"message": "Search source connector deleted successfully"} diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py index 609b4fdd7..39d6bf840 100644 --- a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -127,4 +127,3 @@ async def _check_and_trigger_schedules(): except Exception as e: logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True) await session.rollback() - diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py index 5aae96854..225425714 100644 --- a/surfsense_backend/app/utils/periodic_scheduler.py +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -43,7 +43,7 @@ def create_periodic_schedule( ) -> bool: """ Trigger the first indexing run immediately when periodic indexing is enabled. - + Note: The periodic schedule is managed by the database (next_scheduled_at field) and checked by the meta-scheduler task that runs every minute. This function just triggers the first run for immediate feedback. @@ -63,7 +63,7 @@ def create_periodic_schedule( f"Periodic indexing enabled for connector {connector_id} " f"(frequency: {frequency_minutes} minutes). Triggering first run..." ) - + # Import all indexing tasks from app.tasks.celery_tasks.connector_tasks import ( index_airtable_records_task, @@ -80,7 +80,7 @@ def create_periodic_schedule( index_notion_pages_task, index_slack_messages_task, ) - + # Map connector type to task task_map = { SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, @@ -97,7 +97,7 @@ def create_periodic_schedule( SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, } - + # Trigger the first run immediately task = task_map.get(connector_type) if task: @@ -123,7 +123,7 @@ def create_periodic_schedule( def delete_periodic_schedule(connector_id: int) -> bool: """ Handle deletion of periodic schedule for a connector. - + Note: With the meta-scheduler pattern, the schedule is managed in the database. The next_scheduled_at field being set to None effectively disables it. This function just logs the action. @@ -147,7 +147,7 @@ def update_periodic_schedule( ) -> bool: """ Update an existing periodic schedule for a connector. - + Note: With the meta-scheduler pattern, updates are handled by the database. This function logs the update and optionally triggers an immediate run. @@ -169,4 +169,3 @@ def update_periodic_schedule( # Uncomment the line below if you want immediate execution on schedule update # return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes) return True -