feat: added meta-scheduler for running periodic tasks

This commit is contained in:
MSI\ModSetter 2025-10-22 19:49:30 -07:00
parent 182f815bb7
commit a90767a478
6 changed files with 376 additions and 7 deletions

View file

@ -3,6 +3,7 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
#Celery Config
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Dynamic Periodic task creation uv run celery -A celery_worker.celery_app beat --loglevel=info
SECRET_KEY=SECRET
NEXT_FRONTEND_URL=http://localhost:3000

View file

@ -6,4 +6,8 @@ __pycache__/
.flashrank_cache
surf_new_backend.egg-info/
podcasts/
temp_audio/
temp_audio/
celerybeat-schedule*
celerybeat-schedule.*
celerybeat-schedule.dir
celerybeat-schedule.bak

View file

@ -3,6 +3,7 @@
import os
from celery import Celery
from celery.schedules import crontab
from dotenv import load_dotenv
# Load environment variables
@ -21,6 +22,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.document_tasks",
"app.tasks.celery_tasks.podcast_tasks",
"app.tasks.celery_tasks.connector_tasks",
"app.tasks.celery_tasks.schedule_checker_task",
],
)
@ -47,13 +49,20 @@ celery_app.conf.update(
task_reject_on_worker_lost=True,
# Broker settings
broker_connection_retry_on_startup=True,
# Beat scheduler settings
beat_max_loop_interval=60, # Check every minute
)
# Optional: Configure Celery Beat for periodic tasks
# Configure Celery Beat schedule
# This uses a meta-scheduler pattern: instead of creating individual Beat schedules
# for each connector, we have ONE schedule that checks the database every minute
# for connectors that need indexing. This provides dynamic scheduling without restarts.
celery_app.conf.beat_schedule = {
# Example: Add periodic tasks here if needed
# "periodic-task-name": {
# "task": "app.tasks.celery_tasks.some_task",
# "schedule": crontab(minute=0, hour=0), # Run daily at midnight
# },
"check-periodic-connector-schedules": {
"task": "check_periodic_schedules",
"schedule": crontab(minute="*"), # Run every minute
"options": {
"expires": 30, # Task expires after 30 seconds if not picked up
},
},
}

View file

@ -52,6 +52,11 @@ from app.tasks.connector_indexers import (
)
from app.users import current_active_user
from app.utils.check_ownership import check_ownership
from app.utils.periodic_scheduler import (
create_periodic_schedule,
delete_periodic_schedule,
update_periodic_schedule,
)
# Set up logging
logger = logging.getLogger(__name__)
@ -144,6 +149,21 @@ async def create_search_source_connector(
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
# Create periodic schedule if periodic indexing is enabled
if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes:
success = create_periodic_schedule(
connector_id=db_connector.id,
search_space_id=search_space_id,
user_id=str(user.id),
connector_type=db_connector.connector_type,
frequency_minutes=db_connector.indexing_frequency_minutes,
)
if not success:
logger.warning(
f"Failed to create periodic schedule for connector {db_connector.id}"
)
return db_connector
except ValidationError as e:
await session.rollback()
@ -348,6 +368,30 @@ async def update_search_source_connector(
try:
await session.commit()
await session.refresh(db_connector)
# Handle periodic schedule updates
if "periodic_indexing_enabled" in update_data or "indexing_frequency_minutes" in update_data:
if db_connector.periodic_indexing_enabled and db_connector.indexing_frequency_minutes:
# Create or update the periodic schedule
success = update_periodic_schedule(
connector_id=db_connector.id,
search_space_id=db_connector.search_space_id,
user_id=str(user.id),
connector_type=db_connector.connector_type,
frequency_minutes=db_connector.indexing_frequency_minutes,
)
if not success:
logger.warning(
f"Failed to update periodic schedule for connector {db_connector.id}"
)
else:
# Delete the periodic schedule if disabled
success = delete_periodic_schedule(db_connector.id)
if not success:
logger.warning(
f"Failed to delete periodic schedule for connector {db_connector.id}"
)
return db_connector
except IntegrityError as e:
await session.rollback()
@ -378,6 +422,15 @@ async def delete_search_source_connector(
db_connector = await check_ownership(
session, SearchSourceConnector, connector_id, user
)
# Delete any periodic schedule associated with this connector
if db_connector.periodic_indexing_enabled:
success = delete_periodic_schedule(connector_id)
if not success:
logger.warning(
f"Failed to delete periodic schedule for connector {connector_id}"
)
await session.delete(db_connector)
await session.commit()
return {"message": "Search source connector deleted successfully"}

View file

@ -0,0 +1,130 @@
"""Meta-scheduler task that checks for connectors needing periodic indexing."""
import logging
from datetime import UTC, datetime
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import SearchSourceConnector, SearchSourceConnectorType
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="check_periodic_schedules")
def check_periodic_schedules_task():
"""
Check all connectors for periodic indexing that's due.
This task runs every minute and triggers indexing for any connector
whose next_scheduled_at time has passed.
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_check_and_trigger_schedules())
finally:
loop.close()
async def _check_and_trigger_schedules():
"""Check database for connectors that need indexing and trigger their tasks."""
async with get_celery_session_maker()() as session:
try:
# Find all connectors with periodic indexing enabled that are due
now = datetime.now(UTC)
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.periodic_indexing_enabled == True, # noqa: E712
SearchSourceConnector.next_scheduled_at <= now,
)
)
due_connectors = result.scalars().all()
if not due_connectors:
logger.debug("No connectors due for periodic indexing")
return
logger.info(f"Found {len(due_connectors)} connectors due for indexing")
# Import all indexing tasks
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_confluence_pages_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
index_luma_events_task,
index_notion_pages_task,
index_slack_messages_task,
)
# Map connector types to their tasks
task_map = {
SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task,
SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task,
SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task,
SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task,
SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task,
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task,
SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task,
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task,
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
}
# Trigger indexing for each due connector
for connector in due_connectors:
task = task_map.get(connector.connector_type)
if task:
logger.info(
f"Triggering periodic indexing for connector {connector.id} "
f"({connector.connector_type.value})"
)
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Update next_scheduled_at for next run
from datetime import timedelta
connector.next_scheduled_at = now + timedelta(
minutes=connector.indexing_frequency_minutes
)
await session.commit()
else:
logger.warning(
f"No task found for connector type {connector.connector_type}"
)
except Exception as e:
logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True)
await session.rollback()

View file

@ -0,0 +1,172 @@
"""
Utility functions for managing periodic connector indexing schedules.
This module uses a meta-scheduler pattern instead of RedBeat's dynamic schedule creation.
Instead of creating individual Beat schedules for each connector, we:
1. Store schedule configuration in the database (next_scheduled_at, frequency)
2. Have ONE Beat task that runs every minute checking for due connectors
3. Trigger indexing tasks for connectors whose next_scheduled_at has passed
This avoids RedBeat's limitation where new schedules aren't discovered without restart.
"""
import logging
from app.db import SearchSourceConnectorType
logger = logging.getLogger(__name__)
# Mapping of connector types to their corresponding Celery task names
CONNECTOR_TASK_MAP = {
SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages",
SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages",
SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos",
SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues",
SearchSourceConnectorType.JIRA_CONNECTOR: "index_jira_issues",
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "index_confluence_pages",
SearchSourceConnectorType.CLICKUP_CONNECTOR: "index_clickup_tasks",
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "index_google_calendar_events",
SearchSourceConnectorType.AIRTABLE_CONNECTOR: "index_airtable_records",
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: "index_google_gmail_messages",
SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages",
SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events",
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents",
}
def create_periodic_schedule(
connector_id: int,
search_space_id: int,
user_id: str,
connector_type: SearchSourceConnectorType,
frequency_minutes: int,
) -> bool:
"""
Trigger the first indexing run immediately when periodic indexing is enabled.
Note: The periodic schedule is managed by the database (next_scheduled_at field)
and checked by the meta-scheduler task that runs every minute.
This function just triggers the first run for immediate feedback.
Args:
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: User ID
connector_type: Type of connector
frequency_minutes: Frequency in minutes (used for logging)
Returns:
True if successful, False otherwise
"""
try:
logger.info(
f"Periodic indexing enabled for connector {connector_id} "
f"(frequency: {frequency_minutes} minutes). Triggering first run..."
)
# Import all indexing tasks
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_confluence_pages_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
index_luma_events_task,
index_notion_pages_task,
index_slack_messages_task,
)
# Map connector type to task
task_map = {
SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task,
SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task,
SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task,
SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task,
SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task,
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task,
SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task,
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task,
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
}
# Trigger the first run immediately
task = task_map.get(connector_type)
if task:
task.delay(connector_id, search_space_id, user_id, None, None)
logger.info(
f"✓ First indexing run triggered for connector {connector_id}. "
f"Periodic indexing will continue automatically every {frequency_minutes} minutes."
)
else:
logger.error(f"No task mapping found for connector type: {connector_type}")
return False
return True
except Exception as e:
logger.error(
f"Failed to trigger initial indexing for connector {connector_id}: {e!s}",
exc_info=True,
)
return False
def delete_periodic_schedule(connector_id: int) -> bool:
"""
Handle deletion of periodic schedule for a connector.
Note: With the meta-scheduler pattern, the schedule is managed in the database.
The next_scheduled_at field being set to None effectively disables it.
This function just logs the action.
Args:
connector_id: ID of the connector
Returns:
True (always successful since database handles the state)
"""
logger.info(f"Periodic indexing disabled for connector {connector_id}")
return True
def update_periodic_schedule(
connector_id: int,
search_space_id: int,
user_id: str,
connector_type: SearchSourceConnectorType,
frequency_minutes: int,
) -> bool:
"""
Update an existing periodic schedule for a connector.
Note: With the meta-scheduler pattern, updates are handled by the database.
This function logs the update and optionally triggers an immediate run.
Args:
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: User ID
connector_type: Type of connector
frequency_minutes: New frequency in minutes
Returns:
True if successful, False otherwise
"""
logger.info(
f"Periodic indexing schedule updated for connector {connector_id} "
f"(new frequency: {frequency_minutes} minutes)"
)
# Optionally trigger an immediate run with the new schedule
# Uncomment the line below if you want immediate execution on schedule update
# return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes)
return True