feat: added meta-scheduler for running periodic tasks

This commit is contained in:
MSI\ModSetter 2025-10-22 19:49:44 -07:00
parent a90767a478
commit 8706df6716
3 changed files with 20 additions and 13 deletions

View file

@ -151,7 +151,10 @@ async def create_search_source_connector(
await session.refresh(db_connector) await session.refresh(db_connector)
# Create periodic schedule if periodic indexing is enabled # Create periodic schedule if periodic indexing is enabled
if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: if (
db_connector.periodic_indexing_enabled
and db_connector.indexing_frequency_minutes
):
success = create_periodic_schedule( success = create_periodic_schedule(
connector_id=db_connector.id, connector_id=db_connector.id,
search_space_id=search_space_id, search_space_id=search_space_id,
@ -370,8 +373,14 @@ async def update_search_source_connector(
await session.refresh(db_connector) await session.refresh(db_connector)
# Handle periodic schedule updates # Handle periodic schedule updates
if "periodic_indexing_enabled" in update_data or "indexing_frequency_minutes" in update_data: if (
if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes: "periodic_indexing_enabled" in update_data
or "indexing_frequency_minutes" in update_data
):
if (
db_connector.periodic_indexing_enabled
and db_connector.indexing_frequency_minutes
):
# Create or update the periodic schedule # Create or update the periodic schedule
success = update_periodic_schedule( success = update_periodic_schedule(
connector_id=db_connector.id, connector_id=db_connector.id,
@ -422,7 +431,7 @@ async def delete_search_source_connector(
db_connector = await check_ownership( db_connector = await check_ownership(
session, SearchSourceConnector, connector_id, user session, SearchSourceConnector, connector_id, user
) )
# Delete any periodic schedule associated with this connector # Delete any periodic schedule associated with this connector
if db_connector.periodic_indexing_enabled: if db_connector.periodic_indexing_enabled:
success = delete_periodic_schedule(connector_id) success = delete_periodic_schedule(connector_id)
@ -430,7 +439,7 @@ async def delete_search_source_connector(
logger.warning( logger.warning(
f"Failed to delete periodic schedule for connector {connector_id}" f"Failed to delete periodic schedule for connector {connector_id}"
) )
await session.delete(db_connector) await session.delete(db_connector)
await session.commit() await session.commit()
return {"message": "Search source connector deleted successfully"} return {"message": "Search source connector deleted successfully"}

View file

@ -127,4 +127,3 @@ async def _check_and_trigger_schedules():
except Exception as e: except Exception as e:
logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True) logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True)
await session.rollback() await session.rollback()

View file

@ -43,7 +43,7 @@ def create_periodic_schedule(
) -> bool: ) -> bool:
""" """
Trigger the first indexing run immediately when periodic indexing is enabled. Trigger the first indexing run immediately when periodic indexing is enabled.
Note: The periodic schedule is managed by the database (next_scheduled_at field) Note: The periodic schedule is managed by the database (next_scheduled_at field)
and checked by the meta-scheduler task that runs every minute. and checked by the meta-scheduler task that runs every minute.
This function just triggers the first run for immediate feedback. This function just triggers the first run for immediate feedback.
@ -63,7 +63,7 @@ def create_periodic_schedule(
f"Periodic indexing enabled for connector {connector_id} " f"Periodic indexing enabled for connector {connector_id} "
f"(frequency: {frequency_minutes} minutes). Triggering first run..." f"(frequency: {frequency_minutes} minutes). Triggering first run..."
) )
# Import all indexing tasks # Import all indexing tasks
from app.tasks.celery_tasks.connector_tasks import ( from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task, index_airtable_records_task,
@ -80,7 +80,7 @@ def create_periodic_schedule(
index_notion_pages_task, index_notion_pages_task,
index_slack_messages_task, index_slack_messages_task,
) )
# Map connector type to task # Map connector type to task
task_map = { task_map = {
SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task,
@ -97,7 +97,7 @@ def create_periodic_schedule(
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
} }
# Trigger the first run immediately # Trigger the first run immediately
task = task_map.get(connector_type) task = task_map.get(connector_type)
if task: if task:
@ -123,7 +123,7 @@ def create_periodic_schedule(
def delete_periodic_schedule(connector_id: int) -> bool: def delete_periodic_schedule(connector_id: int) -> bool:
""" """
Handle deletion of periodic schedule for a connector. Handle deletion of periodic schedule for a connector.
Note: With the meta-scheduler pattern, the schedule is managed in the database. Note: With the meta-scheduler pattern, the schedule is managed in the database.
The next_scheduled_at field being set to None effectively disables it. The next_scheduled_at field being set to None effectively disables it.
This function just logs the action. This function just logs the action.
@ -147,7 +147,7 @@ def update_periodic_schedule(
) -> bool: ) -> bool:
""" """
Update an existing periodic schedule for a connector. Update an existing periodic schedule for a connector.
Note: With the meta-scheduler pattern, updates are handled by the database. Note: With the meta-scheduler pattern, updates are handled by the database.
This function logs the update and optionally triggers an immediate run. This function logs the update and optionally triggers an immediate run.
@ -169,4 +169,3 @@ def update_periodic_schedule(
# Uncomment the line below if you want immediate execution on schedule update # Uncomment the line below if you want immediate execution on schedule update
# return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes) # return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes)
return True return True