diff --git a/README.md b/README.md index 9b5d78c3e..9fe07eb35 100644 --- a/README.md +++ b/README.md @@ -136,14 +136,6 @@ Check out our public roadmap and contribute your ideas or feedback: **View the Roadmap:** [SurfSense Roadmap on GitHub Projects](https://github.com/users/MODSetter/projects/2) -## ⚠️ Important Announcement - -**AWS and Vercel are currently experiencing outages.** We deployed a major update to SurfSense last night and have updated our documentation accordingly with important setup and configuration changes. Unfortunately, these documentation updates cannot be deployed to our main site (surfsense.com) due to the ongoing outages. - -**Please view our documentation directly on GitHub:** -📚 [SurfSense Documentation](https://github.com/MODSetter/SurfSense/tree/main/surfsense_web/content/docs) - -We apologize for any inconvenience and appreciate your patience! ## How to get started? diff --git a/docker-compose.yml b/docker-compose.yml index bbf33a9b1..873de6a5f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -74,6 +74,25 @@ services: - redis - backend + celery_beat: + build: ./surfsense_backend + # image: ghcr.io/modsetter/surfsense_backend:latest + command: celery -A app.celery_app beat --loglevel=info + volumes: + - ./surfsense_backend:/app + - shared_temp:/tmp + env_file: + - ./surfsense_backend/.env + environment: + - DATABASE_URL=postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-surfsense} + - CELERY_BROKER_URL=redis://redis:${REDIS_PORT:-6379}/0 + - CELERY_RESULT_BACKEND=redis://redis:${REDIS_PORT:-6379}/0 + - PYTHONPATH=/app + depends_on: + - db + - redis + - celery_worker + # flower: # build: ./surfsense_backend # # image: ghcr.io/modsetter/surfsense_backend:latest diff --git a/surfsense_backend/.env.example b/surfsense_backend/.env.example index 3ddfdd652..56d18c698 100644 --- a/surfsense_backend/.env.example +++ b/surfsense_backend/.env.example @@ -3,6 +3,23 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense #Celery Config CELERY_BROKER_URL=redis://localhost:6379/0 CELERY_RESULT_BACKEND=redis://localhost:6379/0 +# Periodic task interval +# # Run every minute (default) +# SCHEDULE_CHECKER_INTERVAL=1m + +# # Run every 5 minutes +# SCHEDULE_CHECKER_INTERVAL=5m + +# # Run every 10 minutes +# SCHEDULE_CHECKER_INTERVAL=10m + +# # Run every hour +# SCHEDULE_CHECKER_INTERVAL=1h + +# # Run every 2 hours +# SCHEDULE_CHECKER_INTERVAL=2h + +SCHEDULE_CHECKER_INTERVAL=5m SECRET_KEY=SECRET NEXT_FRONTEND_URL=http://localhost:3000 diff --git a/surfsense_backend/.gitignore b/surfsense_backend/.gitignore index acb5f8cd9..e9b62fbbd 100644 --- a/surfsense_backend/.gitignore +++ b/surfsense_backend/.gitignore @@ -6,4 +6,8 @@ __pycache__/ .flashrank_cache surf_new_backend.egg-info/ podcasts/ -temp_audio/ \ No newline at end of file +temp_audio/ +celerybeat-schedule* +celerybeat-schedule.* +celerybeat-schedule.dir +celerybeat-schedule.bak diff --git a/surfsense_backend/alembic/versions/25_migrate_llm_configs_to_search_spaces.py b/surfsense_backend/alembic/versions/25_migrate_llm_configs_to_search_spaces.py index c9966599c..444d8239b 100644 --- a/surfsense_backend/alembic/versions/25_migrate_llm_configs_to_search_spaces.py +++ b/surfsense_backend/alembic/versions/25_migrate_llm_configs_to_search_spaces.py @@ -55,30 +55,45 @@ def upgrade() -> None: # ===== STEP 2: Populate search_space_id with user's first search space ===== # This ensures existing LLM configs are assigned to a valid search space - op.execute( - """ - UPDATE llm_configs lc - SET search_space_id = ( - SELECT id - FROM searchspaces ss - WHERE ss.user_id = lc.user_id - ORDER BY ss.created_at ASC - LIMIT 1 + # Only run this if user_id column exists on llm_configs + if "user_id" in llm_config_columns: + op.execute( + """ + UPDATE llm_configs lc + SET search_space_id = ( + SELECT id + FROM searchspaces ss + WHERE ss.user_id = lc.user_id + ORDER BY ss.created_at ASC + LIMIT 1 + ) + WHERE search_space_id IS NULL AND user_id IS NOT NULL + """ ) - WHERE search_space_id IS NULL AND user_id IS NOT NULL - """ - ) # ===== STEP 3: Make search_space_id NOT NULL and add FK constraint ===== - op.alter_column( - "llm_configs", - "search_space_id", - nullable=False, + # Check if there are any rows with NULL search_space_id + # If llm_configs table is empty or all rows have search_space_id, we can proceed + result = conn.execute( + sa.text("SELECT COUNT(*) FROM llm_configs WHERE search_space_id IS NULL") ) + null_count = result.scalar() - # Add foreign key constraint + if null_count == 0 or "user_id" in llm_config_columns: + # Safe to make NOT NULL + op.alter_column( + "llm_configs", + "search_space_id", + nullable=False, + ) + else: + # If there are NULL values and no user_id to migrate from, skip making it NOT NULL + # This would happen if llm_configs already exists without user_id + pass + + # Add foreign key constraint only if search_space_id is NOT NULL foreign_keys = [fk["name"] for fk in inspector.get_foreign_keys("llm_configs")] - if "fk_llm_configs_search_space_id" not in foreign_keys: + if "fk_llm_configs_search_space_id" not in foreign_keys and null_count == 0: op.create_foreign_key( "fk_llm_configs_search_space_id", "llm_configs", diff --git a/surfsense_backend/alembic/versions/32_add_periodic_indexing_fields.py b/surfsense_backend/alembic/versions/32_add_periodic_indexing_fields.py new file mode 100644 index 000000000..30656739a --- /dev/null +++ b/surfsense_backend/alembic/versions/32_add_periodic_indexing_fields.py @@ -0,0 +1,94 @@ +"""Add periodic indexing fields to search_source_connectors + +Revision ID: 32 +Revises: 31 + +Changes: +1. Add periodic_indexing_enabled column (Boolean, default False) +2. Add indexing_frequency_minutes column (Integer, nullable) +3. Add next_scheduled_at column (TIMESTAMP with timezone, nullable) +""" + +from collections.abc import Sequence + +import sqlalchemy as sa + +from alembic import op + +# revision identifiers, used by Alembic. +revision: str = "32" +down_revision: str | None = "31" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + """Add periodic indexing fields to search_source_connectors table.""" + + from sqlalchemy import inspect + + conn = op.get_bind() + inspector = inspect(conn) + + # Get existing columns + connector_columns = [ + col["name"] for col in inspector.get_columns("search_source_connectors") + ] + + # Add periodic_indexing_enabled column if it doesn't exist + if "periodic_indexing_enabled" not in connector_columns: + op.add_column( + "search_source_connectors", + sa.Column( + "periodic_indexing_enabled", + sa.Boolean(), + nullable=False, + server_default="false", + ), + ) + + # Add indexing_frequency_minutes column if it doesn't exist + if "indexing_frequency_minutes" not in connector_columns: + op.add_column( + "search_source_connectors", + sa.Column( + "indexing_frequency_minutes", + sa.Integer(), + nullable=True, + ), + ) + + # Add next_scheduled_at column if it doesn't exist + if "next_scheduled_at" not in connector_columns: + op.add_column( + "search_source_connectors", + sa.Column( + "next_scheduled_at", + sa.TIMESTAMP(timezone=True), + nullable=True, + ), + ) + + +def downgrade() -> None: + """Remove periodic indexing fields from search_source_connectors table.""" + + from sqlalchemy import inspect + + conn = op.get_bind() + inspector = inspect(conn) + + # Get existing columns + connector_columns = [ + col["name"] for col in inspector.get_columns("search_source_connectors") + ] + + # Drop columns if they exist + if "next_scheduled_at" in connector_columns: + op.drop_column("search_source_connectors", "next_scheduled_at") + + if "indexing_frequency_minutes" in connector_columns: + op.drop_column("search_source_connectors", "indexing_frequency_minutes") + + if "periodic_indexing_enabled" in connector_columns: + op.drop_column("search_source_connectors", "periodic_indexing_enabled") diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 109c8b7fc..a2a613777 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -3,6 +3,7 @@ import os from celery import Celery +from celery.schedules import crontab from dotenv import load_dotenv # Load environment variables @@ -12,6 +13,46 @@ load_dotenv() CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0") CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0") +# Get schedule checker interval from environment +# Format: "" where unit is 'm' (minutes) or 'h' (hours) +# Examples: "1m" (every minute), "5m" (every 5 minutes), "1h" (every hour) +SCHEDULE_CHECKER_INTERVAL = os.getenv("SCHEDULE_CHECKER_INTERVAL", "2m") + + +def parse_schedule_interval(interval: str) -> dict: + """Parse interval string into crontab parameters. + + Args: + interval: String like "1m", "5m", "1h", etc. + + Returns: + Dict with crontab parameters (minute, hour) + """ + interval = interval.strip().lower() + + # Extract number and unit + if interval.endswith("m") or interval.endswith("min"): + # Minutes + num = int(interval.rstrip("min")) + if num == 1: + return {"minute": "*", "hour": "*"} + else: + return {"minute": f"*/{num}", "hour": "*"} + elif interval.endswith("h") or interval.endswith("hour"): + # Hours + num = int(interval.rstrip("hour")) + if num == 1: + return {"minute": "0", "hour": "*"} + else: + return {"minute": "0", "hour": f"*/{num}"} + else: + # Default to every minute if parsing fails + return {"minute": "*", "hour": "*"} + + +# Parse the schedule interval +schedule_params = parse_schedule_interval(SCHEDULE_CHECKER_INTERVAL) + # Create Celery app celery_app = Celery( "surfsense", @@ -21,6 +62,7 @@ celery_app = Celery( "app.tasks.celery_tasks.document_tasks", "app.tasks.celery_tasks.podcast_tasks", "app.tasks.celery_tasks.connector_tasks", + "app.tasks.celery_tasks.schedule_checker_task", ], ) @@ -47,13 +89,20 @@ celery_app.conf.update( task_reject_on_worker_lost=True, # Broker settings broker_connection_retry_on_startup=True, + # Beat scheduler settings + beat_max_loop_interval=60, # Check every minute ) -# Optional: Configure Celery Beat for periodic tasks +# Configure Celery Beat schedule +# This uses a meta-scheduler pattern: instead of creating individual Beat schedules +# for each connector, we have ONE schedule that checks the database at the configured interval +# for connectors that need indexing. This provides dynamic scheduling without restarts. celery_app.conf.beat_schedule = { - # Example: Add periodic tasks here if needed - # "periodic-task-name": { - # "task": "app.tasks.celery_tasks.some_task", - # "schedule": crontab(minute=0, hour=0), # Run daily at midnight - # }, + "check-periodic-connector-schedules": { + "task": "check_periodic_schedules", + "schedule": crontab(**schedule_params), + "options": { + "expires": 30, # Task expires after 30 seconds if not picked up + }, + }, } diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index db5ea73ea..5183adc93 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -285,6 +285,11 @@ class SearchSourceConnector(BaseModel, TimestampMixin): last_indexed_at = Column(TIMESTAMP(timezone=True), nullable=True) config = Column(JSON, nullable=False) + # Periodic indexing fields + periodic_indexing_enabled = Column(Boolean, nullable=False, default=False) + indexing_frequency_minutes = Column(Integer, nullable=True) + next_scheduled_at = Column(TIMESTAMP(timezone=True), nullable=True) + search_space_id = Column( Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False ) diff --git a/surfsense_backend/app/routes/search_source_connectors_routes.py b/surfsense_backend/app/routes/search_source_connectors_routes.py index d0ee44f51..bd24efd49 100644 --- a/surfsense_backend/app/routes/search_source_connectors_routes.py +++ b/surfsense_backend/app/routes/search_source_connectors_routes.py @@ -11,7 +11,7 @@ Note: Each search space can have only one connector of each type per user (based """ import logging -from datetime import datetime, timedelta +from datetime import UTC, datetime, timedelta from typing import Any from fastapi import APIRouter, Depends, HTTPException, Query @@ -52,6 +52,11 @@ from app.tasks.connector_indexers import ( ) from app.users import current_active_user from app.utils.check_ownership import check_ownership +from app.utils.periodic_scheduler import ( + create_periodic_schedule, + delete_periodic_schedule, + update_periodic_schedule, +) # Set up logging logger = logging.getLogger(__name__) @@ -124,12 +129,44 @@ async def create_search_source_connector( status_code=409, detail=f"A connector with type {connector.connector_type} already exists in this search space. Each search space can have only one connector of each type per user.", ) + + # Prepare connector data + connector_data = connector.model_dump() + + # Automatically set next_scheduled_at if periodic indexing is enabled + if ( + connector.periodic_indexing_enabled + and connector.indexing_frequency_minutes + and connector.next_scheduled_at is None + ): + connector_data["next_scheduled_at"] = datetime.now(UTC) + timedelta( + minutes=connector.indexing_frequency_minutes + ) + db_connector = SearchSourceConnector( - **connector.model_dump(), search_space_id=search_space_id, user_id=user.id + **connector_data, search_space_id=search_space_id, user_id=user.id ) session.add(db_connector) await session.commit() await session.refresh(db_connector) + + # Create periodic schedule if periodic indexing is enabled + if ( + db_connector.periodic_indexing_enabled + and db_connector.indexing_frequency_minutes + ): + success = create_periodic_schedule( + connector_id=db_connector.id, + search_space_id=search_space_id, + user_id=str(user.id), + connector_type=db_connector.connector_type, + frequency_minutes=db_connector.indexing_frequency_minutes, + ) + if not success: + logger.warning( + f"Failed to create periodic schedule for connector {db_connector.id}" + ) + return db_connector except ValidationError as e: await session.rollback() @@ -224,6 +261,50 @@ async def update_search_source_connector( # Convert the sparse update data (only fields present in request) to a dict update_data = connector_update.model_dump(exclude_unset=True) + # Validate periodic indexing fields + # Get the effective values after update + effective_is_indexable = update_data.get("is_indexable", db_connector.is_indexable) + effective_periodic_enabled = update_data.get( + "periodic_indexing_enabled", db_connector.periodic_indexing_enabled + ) + effective_frequency = update_data.get( + "indexing_frequency_minutes", db_connector.indexing_frequency_minutes + ) + + # Validate periodic indexing configuration + if effective_periodic_enabled: + if not effective_is_indexable: + raise HTTPException( + status_code=422, + detail="periodic_indexing_enabled can only be True for indexable connectors", + ) + if effective_frequency is None: + raise HTTPException( + status_code=422, + detail="indexing_frequency_minutes is required when periodic_indexing_enabled is True", + ) + if effective_frequency <= 0: + raise HTTPException( + status_code=422, + detail="indexing_frequency_minutes must be greater than 0", + ) + + # Automatically set next_scheduled_at if not provided and periodic indexing is being enabled + if ( + "periodic_indexing_enabled" in update_data + or "indexing_frequency_minutes" in update_data + ) and "next_scheduled_at" not in update_data: + # Schedule the next indexing based on the frequency + update_data["next_scheduled_at"] = datetime.now(UTC) + timedelta( + minutes=effective_frequency + ) + elif ( + effective_periodic_enabled is False + and "periodic_indexing_enabled" in update_data + ): + # If disabling periodic indexing, clear the next_scheduled_at + update_data["next_scheduled_at"] = None + # Special handling for 'config' field if "config" in update_data: incoming_config = update_data["config"] # Config data from the request @@ -290,6 +371,36 @@ async def update_search_source_connector( try: await session.commit() await session.refresh(db_connector) + + # Handle periodic schedule updates + if ( + "periodic_indexing_enabled" in update_data + or "indexing_frequency_minutes" in update_data + ): + if ( + db_connector.periodic_indexing_enabled + and db_connector.indexing_frequency_minutes + ): + # Create or update the periodic schedule + success = update_periodic_schedule( + connector_id=db_connector.id, + search_space_id=db_connector.search_space_id, + user_id=str(user.id), + connector_type=db_connector.connector_type, + frequency_minutes=db_connector.indexing_frequency_minutes, + ) + if not success: + logger.warning( + f"Failed to update periodic schedule for connector {db_connector.id}" + ) + else: + # Delete the periodic schedule if disabled + success = delete_periodic_schedule(db_connector.id) + if not success: + logger.warning( + f"Failed to delete periodic schedule for connector {db_connector.id}" + ) + return db_connector except IntegrityError as e: await session.rollback() @@ -320,6 +431,15 @@ async def delete_search_source_connector( db_connector = await check_ownership( session, SearchSourceConnector, connector_id, user ) + + # Delete any periodic schedule associated with this connector + if db_connector.periodic_indexing_enabled: + success = delete_periodic_schedule(connector_id) + if not success: + logger.warning( + f"Failed to delete periodic schedule for connector {connector_id}" + ) + await session.delete(db_connector) await session.commit() return {"message": "Search source connector deleted successfully"} diff --git a/surfsense_backend/app/schemas/search_source_connector.py b/surfsense_backend/app/schemas/search_source_connector.py index b84eeb07d..1e8a7a38d 100644 --- a/surfsense_backend/app/schemas/search_source_connector.py +++ b/surfsense_backend/app/schemas/search_source_connector.py @@ -2,7 +2,7 @@ import uuid from datetime import datetime from typing import Any -from pydantic import BaseModel, ConfigDict, field_validator +from pydantic import BaseModel, ConfigDict, field_validator, model_validator from app.db import SearchSourceConnectorType from app.utils.validators import validate_connector_config @@ -16,6 +16,9 @@ class SearchSourceConnectorBase(BaseModel): is_indexable: bool last_indexed_at: datetime | None = None config: dict[str, Any] + periodic_indexing_enabled: bool = False + indexing_frequency_minutes: int | None = None + next_scheduled_at: datetime | None = None @field_validator("config") @classmethod @@ -25,6 +28,22 @@ class SearchSourceConnectorBase(BaseModel): connector_type = values.data.get("connector_type") return validate_connector_config(connector_type, config) + @model_validator(mode="after") + def validate_periodic_indexing(self): + """Validate that periodic indexing configuration is consistent.""" + if self.periodic_indexing_enabled: + if not self.is_indexable: + raise ValueError( + "periodic_indexing_enabled can only be True for indexable connectors" + ) + if self.indexing_frequency_minutes is None: + raise ValueError( + "indexing_frequency_minutes is required when periodic_indexing_enabled is True" + ) + if self.indexing_frequency_minutes <= 0: + raise ValueError("indexing_frequency_minutes must be greater than 0") + return self + class SearchSourceConnectorCreate(SearchSourceConnectorBase): pass @@ -36,6 +55,9 @@ class SearchSourceConnectorUpdate(BaseModel): is_indexable: bool | None = None last_indexed_at: datetime | None = None config: dict[str, Any] | None = None + periodic_indexing_enabled: bool | None = None + indexing_frequency_minutes: int | None = None + next_scheduled_at: datetime | None = None class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampModel): diff --git a/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py new file mode 100644 index 000000000..39d6bf840 --- /dev/null +++ b/surfsense_backend/app/tasks/celery_tasks/schedule_checker_task.py @@ -0,0 +1,129 @@ +"""Meta-scheduler task that checks for connectors needing periodic indexing.""" + +import logging +from datetime import UTC, datetime + +from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine +from sqlalchemy.future import select +from sqlalchemy.pool import NullPool + +from app.celery_app import celery_app +from app.config import config +from app.db import SearchSourceConnector, SearchSourceConnectorType + +logger = logging.getLogger(__name__) + + +def get_celery_session_maker(): + """Create async session maker for Celery tasks.""" + engine = create_async_engine( + config.DATABASE_URL, + poolclass=NullPool, + echo=False, + ) + return async_sessionmaker(engine, expire_on_commit=False) + + +@celery_app.task(name="check_periodic_schedules") +def check_periodic_schedules_task(): + """ + Check all connectors for periodic indexing that's due. + This task runs every minute and triggers indexing for any connector + whose next_scheduled_at time has passed. + """ + import asyncio + + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + try: + loop.run_until_complete(_check_and_trigger_schedules()) + finally: + loop.close() + + +async def _check_and_trigger_schedules(): + """Check database for connectors that need indexing and trigger their tasks.""" + async with get_celery_session_maker()() as session: + try: + # Find all connectors with periodic indexing enabled that are due + now = datetime.now(UTC) + result = await session.execute( + select(SearchSourceConnector).filter( + SearchSourceConnector.periodic_indexing_enabled == True, # noqa: E712 + SearchSourceConnector.next_scheduled_at <= now, + ) + ) + due_connectors = result.scalars().all() + + if not due_connectors: + logger.debug("No connectors due for periodic indexing") + return + + logger.info(f"Found {len(due_connectors)} connectors due for indexing") + + # Import all indexing tasks + from app.tasks.celery_tasks.connector_tasks import ( + index_airtable_records_task, + index_clickup_tasks_task, + index_confluence_pages_task, + index_discord_messages_task, + index_elasticsearch_documents_task, + index_github_repos_task, + index_google_calendar_events_task, + index_google_gmail_messages_task, + index_jira_issues_task, + index_linear_issues_task, + index_luma_events_task, + index_notion_pages_task, + index_slack_messages_task, + ) + + # Map connector types to their tasks + task_map = { + SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, + SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task, + SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task, + SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task, + SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task, + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task, + SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task, + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task, + SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task, + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task, + SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, + SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + } + + # Trigger indexing for each due connector + for connector in due_connectors: + task = task_map.get(connector.connector_type) + if task: + logger.info( + f"Triggering periodic indexing for connector {connector.id} " + f"({connector.connector_type.value})" + ) + task.delay( + connector.id, + connector.search_space_id, + str(connector.user_id), + None, # start_date - uses last_indexed_at + None, # end_date - uses now + ) + + # Update next_scheduled_at for next run + from datetime import timedelta + + connector.next_scheduled_at = now + timedelta( + minutes=connector.indexing_frequency_minutes + ) + await session.commit() + else: + logger.warning( + f"No task found for connector type {connector.connector_type}" + ) + + except Exception as e: + logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True) + await session.rollback() diff --git a/surfsense_backend/app/utils/periodic_scheduler.py b/surfsense_backend/app/utils/periodic_scheduler.py new file mode 100644 index 000000000..225425714 --- /dev/null +++ b/surfsense_backend/app/utils/periodic_scheduler.py @@ -0,0 +1,171 @@ +""" +Utility functions for managing periodic connector indexing schedules. + +This module uses a meta-scheduler pattern instead of RedBeat's dynamic schedule creation. +Instead of creating individual Beat schedules for each connector, we: +1. Store schedule configuration in the database (next_scheduled_at, frequency) +2. Have ONE Beat task that runs every minute checking for due connectors +3. Trigger indexing tasks for connectors whose next_scheduled_at has passed + +This avoids RedBeat's limitation where new schedules aren't discovered without restart. +""" + +import logging + +from app.db import SearchSourceConnectorType + +logger = logging.getLogger(__name__) + +# Mapping of connector types to their corresponding Celery task names +CONNECTOR_TASK_MAP = { + SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages", + SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages", + SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos", + SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues", + SearchSourceConnectorType.JIRA_CONNECTOR: "index_jira_issues", + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "index_confluence_pages", + SearchSourceConnectorType.CLICKUP_CONNECTOR: "index_clickup_tasks", + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "index_google_calendar_events", + SearchSourceConnectorType.AIRTABLE_CONNECTOR: "index_airtable_records", + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: "index_google_gmail_messages", + SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages", + SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events", + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents", +} + + +def create_periodic_schedule( + connector_id: int, + search_space_id: int, + user_id: str, + connector_type: SearchSourceConnectorType, + frequency_minutes: int, +) -> bool: + """ + Trigger the first indexing run immediately when periodic indexing is enabled. + + Note: The periodic schedule is managed by the database (next_scheduled_at field) + and checked by the meta-scheduler task that runs every minute. + This function just triggers the first run for immediate feedback. + + Args: + connector_id: ID of the connector + search_space_id: ID of the search space + user_id: User ID + connector_type: Type of connector + frequency_minutes: Frequency in minutes (used for logging) + + Returns: + True if successful, False otherwise + """ + try: + logger.info( + f"Periodic indexing enabled for connector {connector_id} " + f"(frequency: {frequency_minutes} minutes). Triggering first run..." + ) + + # Import all indexing tasks + from app.tasks.celery_tasks.connector_tasks import ( + index_airtable_records_task, + index_clickup_tasks_task, + index_confluence_pages_task, + index_discord_messages_task, + index_elasticsearch_documents_task, + index_github_repos_task, + index_google_calendar_events_task, + index_google_gmail_messages_task, + index_jira_issues_task, + index_linear_issues_task, + index_luma_events_task, + index_notion_pages_task, + index_slack_messages_task, + ) + + # Map connector type to task + task_map = { + SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task, + SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task, + SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task, + SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task, + SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task, + SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task, + SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task, + SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task, + SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task, + SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task, + SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task, + SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task, + SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task, + } + + # Trigger the first run immediately + task = task_map.get(connector_type) + if task: + task.delay(connector_id, search_space_id, user_id, None, None) + logger.info( + f"✓ First indexing run triggered for connector {connector_id}. " + f"Periodic indexing will continue automatically every {frequency_minutes} minutes." + ) + else: + logger.error(f"No task mapping found for connector type: {connector_type}") + return False + + return True + + except Exception as e: + logger.error( + f"Failed to trigger initial indexing for connector {connector_id}: {e!s}", + exc_info=True, + ) + return False + + +def delete_periodic_schedule(connector_id: int) -> bool: + """ + Handle deletion of periodic schedule for a connector. + + Note: With the meta-scheduler pattern, the schedule is managed in the database. + The next_scheduled_at field being set to None effectively disables it. + This function just logs the action. + + Args: + connector_id: ID of the connector + + Returns: + True (always successful since database handles the state) + """ + logger.info(f"Periodic indexing disabled for connector {connector_id}") + return True + + +def update_periodic_schedule( + connector_id: int, + search_space_id: int, + user_id: str, + connector_type: SearchSourceConnectorType, + frequency_minutes: int, +) -> bool: + """ + Update an existing periodic schedule for a connector. + + Note: With the meta-scheduler pattern, updates are handled by the database. + This function logs the update and optionally triggers an immediate run. + + Args: + connector_id: ID of the connector + search_space_id: ID of the search space + user_id: User ID + connector_type: Type of connector + frequency_minutes: New frequency in minutes + + Returns: + True if successful, False otherwise + """ + logger.info( + f"Periodic indexing schedule updated for connector {connector_id} " + f"(new frequency: {frequency_minutes} minutes)" + ) + # Optionally trigger an immediate run with the new schedule + # Uncomment the line below if you want immediate execution on schedule update + # return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes) + return True diff --git a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx index eb1d53895..3537885ca 100644 --- a/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx +++ b/surfsense_web/app/dashboard/[search_space_id]/connectors/(manage)/page.tsx @@ -1,7 +1,15 @@ "use client"; import { format } from "date-fns"; -import { Calendar as CalendarIcon, Edit, Plus, RefreshCw, Trash2 } from "lucide-react"; +import { + Calendar as CalendarIcon, + Clock, + Edit, + Loader2, + Plus, + RefreshCw, + Trash2, +} from "lucide-react"; import { motion } from "motion/react"; import { useParams, useRouter } from "next/navigation"; import { useEffect, useState } from "react"; @@ -28,8 +36,17 @@ import { DialogHeader, DialogTitle, } from "@/components/ui/dialog"; +import { Input } from "@/components/ui/input"; import { Label } from "@/components/ui/label"; import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover"; +import { + Select, + SelectContent, + SelectItem, + SelectTrigger, + SelectValue, +} from "@/components/ui/select"; +import { Switch } from "@/components/ui/switch"; import { Table, TableBody, @@ -64,7 +81,7 @@ export default function ConnectorsPage() { const searchSpaceId = params.search_space_id as string; const today = new Date(); - const { connectors, isLoading, error, deleteConnector, indexConnector } = + const { connectors, isLoading, error, deleteConnector, indexConnector, updateConnector } = useSearchSourceConnectors(false, parseInt(searchSpaceId)); const [connectorToDelete, setConnectorToDelete] = useState(null); const [indexingConnectorId, setIndexingConnectorId] = useState(null); @@ -75,6 +92,16 @@ export default function ConnectorsPage() { const [startDate, setStartDate] = useState(undefined); const [endDate, setEndDate] = useState(undefined); + // Periodic indexing state + const [periodicDialogOpen, setPeriodicDialogOpen] = useState(false); + const [selectedConnectorForPeriodic, setSelectedConnectorForPeriodic] = useState( + null + ); + const [periodicEnabled, setPeriodicEnabled] = useState(false); + const [frequencyMinutes, setFrequencyMinutes] = useState("1440"); + const [customFrequency, setCustomFrequency] = useState(""); + const [isSavingPeriodic, setIsSavingPeriodic] = useState(false); + useEffect(() => { if (error) { toast.error("Failed to load connectors"); @@ -141,6 +168,84 @@ export default function ConnectorsPage() { } }; + // Handle opening periodic indexing dialog + const handleOpenPeriodicDialog = (connectorId: number) => { + const connector = connectors.find((c) => c.id === connectorId); + if (!connector) return; + + setSelectedConnectorForPeriodic(connectorId); + setPeriodicEnabled(connector.periodic_indexing_enabled); + + if (connector.indexing_frequency_minutes) { + // Check if it's a preset value + const presetValues = ["15", "60", "360", "720", "1440", "10080"]; + if (presetValues.includes(connector.indexing_frequency_minutes.toString())) { + setFrequencyMinutes(connector.indexing_frequency_minutes.toString()); + setCustomFrequency(""); + } else { + setFrequencyMinutes("custom"); + setCustomFrequency(connector.indexing_frequency_minutes.toString()); + } + } else { + setFrequencyMinutes("1440"); + setCustomFrequency(""); + } + + setPeriodicDialogOpen(true); + }; + + // Handle saving periodic indexing configuration + const handleSavePeriodicIndexing = async () => { + if (selectedConnectorForPeriodic === null) return; + + const connector = connectors.find((c) => c.id === selectedConnectorForPeriodic); + if (!connector) return; + + setIsSavingPeriodic(true); + try { + // Determine the frequency value + let frequency: number | null = null; + if (periodicEnabled) { + if (frequencyMinutes === "custom") { + frequency = parseInt(customFrequency, 10); + if (isNaN(frequency) || frequency <= 0) { + toast.error("Please enter a valid frequency in minutes"); + setIsSavingPeriodic(false); + return; + } + } else { + frequency = parseInt(frequencyMinutes, 10); + } + } + + await updateConnector(selectedConnectorForPeriodic, { + periodic_indexing_enabled: periodicEnabled, + indexing_frequency_minutes: frequency, + }); + + toast.success( + periodicEnabled + ? "Periodic indexing enabled successfully" + : "Periodic indexing disabled successfully" + ); + setPeriodicDialogOpen(false); + } catch (error) { + console.error("Error updating periodic indexing:", error); + toast.error(error instanceof Error ? error.message : "Failed to update periodic indexing"); + } finally { + setIsSavingPeriodic(false); + setSelectedConnectorForPeriodic(null); + } + }; + + // Format frequency for display + const formatFrequency = (minutes: number): string => { + if (minutes < 60) return `${minutes}m`; + if (minutes < 1440) return `${Math.floor(minutes / 60)}h`; + if (minutes < 10080) return `${Math.floor(minutes / 1440)}d`; + return `${Math.floor(minutes / 10080)}w`; + }; + return (
Name Type Last Indexed + Periodic Actions @@ -206,6 +312,41 @@ export default function ConnectorsPage() { ? formatDateTime(connector.last_indexed_at) : "Not indexable"} + + {connector.is_indexable ? ( + connector.periodic_indexing_enabled ? ( + + + +
+ + + {connector.indexing_frequency_minutes + ? formatFrequency(connector.indexing_frequency_minutes) + : "Enabled"} + +
+
+ +

+ Runs every {connector.indexing_frequency_minutes} minutes + {connector.next_scheduled_at && ( + <> +
+ Next: {formatDateTime(connector.next_scheduled_at)} + + )} +

+
+
+
+ ) : ( + Disabled + ) + ) : ( + - + )} +
{connector.is_indexable && ( @@ -256,6 +397,25 @@ export default function ConnectorsPage() {
)} + {connector.is_indexable && ( + + + + + + +

Configure Periodic Indexing

+
+
+
+ )} + + + +
); } diff --git a/surfsense_web/components/homepage/navbar.tsx b/surfsense_web/components/homepage/navbar.tsx index 1341431f5..d3a2e4c4f 100644 --- a/surfsense_web/components/homepage/navbar.tsx +++ b/surfsense_web/components/homepage/navbar.tsx @@ -129,11 +129,18 @@ const MobileNav = ({ navItems, isScrolled }: any) => { SurfSense - {open ? ( - setOpen(!open)} /> - ) : ( - setOpen(!open)} /> - )} + @@ -155,10 +162,10 @@ const MobileNav = ({ navItems, isScrolled }: any) => { ))}
@@ -166,7 +173,7 @@ const MobileNav = ({ navItems, isScrolled }: any) => { href="https://github.com/MODSetter/SurfSense" target="_blank" rel="noopener noreferrer" - className="flex items-center gap-1.5 rounded-lg px-3 py-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors" + className="flex items-center gap-1.5 rounded-lg px-3 py-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors touch-manipulation" > {loadingGithubStars ? ( @@ -179,12 +186,12 @@ const MobileNav = ({ navItems, isScrolled }: any) => {
- + )}
diff --git a/surfsense_web/content/docs/docker-installation.mdx b/surfsense_web/content/docs/docker-installation.mdx index 245b95d61..0394bd2c6 100644 --- a/surfsense_web/content/docs/docker-installation.mdx +++ b/surfsense_web/content/docs/docker-installation.mdx @@ -98,6 +98,8 @@ Before you begin, ensure you have: | LLAMA_CLOUD_API_KEY | API key for LlamaCloud service for document parsing (required if ETL_SERVICE=LLAMACLOUD) | | CELERY_BROKER_URL | Redis connection URL for Celery broker (e.g., `redis://localhost:6379/0`) | | CELERY_RESULT_BACKEND | Redis connection URL for Celery result backend (e.g., `redis://localhost:6379/0`) | +| SCHEDULE_CHECKER_INTERVAL | (Optional) How often to check for scheduled connector tasks. Format: `` where unit is `m` (minutes) or `h` (hours). Examples: `1m`, `5m`, `1h`, `2h` (default: `1m`) | +| REGISTRATION_ENABLED | (Optional) Enable or disable new user registration (e.g., `TRUE` or `FALSE`, default: `TRUE`) | **Optional Backend LangSmith Observability:** @@ -181,6 +183,22 @@ For more details, see the [Uvicorn documentation](https://www.uvicorn.org/#comma - API Documentation: [http://localhost:8000/docs](http://localhost:8000/docs) - pgAdmin: [http://localhost:5050](http://localhost:5050) +## Docker Services Overview + +The Docker setup includes several services that work together: + +- **Backend**: FastAPI application server +- **Frontend**: Next.js web application +- **PostgreSQL (db)**: Database with pgvector extension +- **Redis**: Message broker for Celery +- **Celery Worker**: Handles background tasks (document processing, indexing, etc.) +- **Celery Beat**: Scheduler for periodic tasks (enables scheduled connector indexing) + - The schedule interval can be configured using the `SCHEDULE_CHECKER_INTERVAL` environment variable in your backend `.env` file + - Default: checks every minute for connectors that need indexing +- **pgAdmin**: Database management interface + +All services start automatically with `docker compose up`. The Celery Beat service ensures that periodic indexing functionality works out of the box. + ## Using pgAdmin pgAdmin is included in the Docker setup to help manage your PostgreSQL database. To connect: diff --git a/surfsense_web/content/docs/manual-installation.mdx b/surfsense_web/content/docs/manual-installation.mdx index 822b7f55e..a81e531ce 100644 --- a/surfsense_web/content/docs/manual-installation.mdx +++ b/surfsense_web/content/docs/manual-installation.mdx @@ -86,6 +86,8 @@ Edit the `.env` file and set the following variables: | LLAMA_CLOUD_API_KEY | API key for LlamaCloud service for document parsing (required if ETL_SERVICE=LLAMACLOUD) | | CELERY_BROKER_URL | Redis connection URL for Celery broker (e.g., `redis://localhost:6379/0`) | | CELERY_RESULT_BACKEND | Redis connection URL for Celery result backend (e.g., `redis://localhost:6379/0`) | +| SCHEDULE_CHECKER_INTERVAL | (Optional) How often to check for scheduled connector tasks. Format: `` where unit is `m` (minutes) or `h` (hours). Examples: `1m`, `5m`, `1h`, `2h` (default: `1m`) | +| REGISTRATION_ENABLED | (Optional) Enable or disable new user registration (e.g., `TRUE` or `FALSE`, default: `TRUE`) | **(Optional) Backend LangSmith Observability:** @@ -249,7 +251,23 @@ uv run celery -A celery_worker.celery_app flower --port=5555 Access Flower at [http://localhost:5555](http://localhost:5555) to monitor your Celery tasks. -### 5. Run the Backend +### 5. Start Celery Beat (Scheduler) + +In another new terminal window, start Celery Beat to enable periodic tasks (like scheduled connector indexing): + +**Linux/macOS/Windows:** + +```bash +# Make sure you're in the surfsense_backend directory +cd surfsense_backend + +# Start Celery Beat +uv run celery -A celery_worker.celery_app beat --loglevel=info +``` + +**Important**: Celery Beat is required for the periodic indexing functionality to work. Without it, scheduled connector tasks won't run automatically. The schedule interval can be configured using the `SCHEDULE_CHECKER_INTERVAL` environment variable. + +### 6. Run the Backend Start the backend server: diff --git a/surfsense_web/hooks/use-search-source-connectors.ts b/surfsense_web/hooks/use-search-source-connectors.ts index 5a45fd761..752eda978 100644 --- a/surfsense_web/hooks/use-search-source-connectors.ts +++ b/surfsense_web/hooks/use-search-source-connectors.ts @@ -10,6 +10,9 @@ export interface SearchSourceConnector { search_space_id: number; user_id?: string; created_at?: string; + periodic_indexing_enabled: boolean; + indexing_frequency_minutes: number | null; + next_scheduled_at: string | null; } export interface ConnectorSourceItem {