Merge pull request #428 from MODSetter/dev

feat: added periodic indexing for indexable search source connectors
This commit is contained in:
Rohan Verma 2025-10-23 01:04:28 -07:00 committed by GitHub
commit a44deb7635
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
17 changed files with 998 additions and 51 deletions

View file

@ -136,14 +136,6 @@ Check out our public roadmap and contribute your ideas or feedback:
**View the Roadmap:** [SurfSense Roadmap on GitHub Projects](https://github.com/users/MODSetter/projects/2)
## ⚠️ Important Announcement
**AWS and Vercel are currently experiencing outages.** We deployed a major update to SurfSense last night and have updated our documentation accordingly with important setup and configuration changes. Unfortunately, these documentation updates cannot be deployed to our main site (surfsense.com) due to the ongoing outages.
**Please view our documentation directly on GitHub:**
📚 [SurfSense Documentation](https://github.com/MODSetter/SurfSense/tree/main/surfsense_web/content/docs)
We apologize for any inconvenience and appreciate your patience!
## How to get started?

View file

@ -74,6 +74,25 @@ services:
- redis
- backend
celery_beat:
build: ./surfsense_backend
# image: ghcr.io/modsetter/surfsense_backend:latest
command: celery -A app.celery_app beat --loglevel=info
volumes:
- ./surfsense_backend:/app
- shared_temp:/tmp
env_file:
- ./surfsense_backend/.env
environment:
- DATABASE_URL=postgresql+asyncpg://${POSTGRES_USER:-postgres}:${POSTGRES_PASSWORD:-postgres}@db:5432/${POSTGRES_DB:-surfsense}
- CELERY_BROKER_URL=redis://redis:${REDIS_PORT:-6379}/0
- CELERY_RESULT_BACKEND=redis://redis:${REDIS_PORT:-6379}/0
- PYTHONPATH=/app
depends_on:
- db
- redis
- celery_worker
# flower:
# build: ./surfsense_backend
# # image: ghcr.io/modsetter/surfsense_backend:latest

View file

@ -3,6 +3,23 @@ DATABASE_URL=postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense
#Celery Config
CELERY_BROKER_URL=redis://localhost:6379/0
CELERY_RESULT_BACKEND=redis://localhost:6379/0
# Periodic task interval
# # Run every minute (default)
# SCHEDULE_CHECKER_INTERVAL=1m
# # Run every 5 minutes
# SCHEDULE_CHECKER_INTERVAL=5m
# # Run every 10 minutes
# SCHEDULE_CHECKER_INTERVAL=10m
# # Run every hour
# SCHEDULE_CHECKER_INTERVAL=1h
# # Run every 2 hours
# SCHEDULE_CHECKER_INTERVAL=2h
SCHEDULE_CHECKER_INTERVAL=5m
SECRET_KEY=SECRET
NEXT_FRONTEND_URL=http://localhost:3000

View file

@ -6,4 +6,8 @@ __pycache__/
.flashrank_cache
surf_new_backend.egg-info/
podcasts/
temp_audio/
temp_audio/
celerybeat-schedule*
celerybeat-schedule.*
celerybeat-schedule.dir
celerybeat-schedule.bak

View file

@ -55,30 +55,45 @@ def upgrade() -> None:
# ===== STEP 2: Populate search_space_id with user's first search space =====
# This ensures existing LLM configs are assigned to a valid search space
op.execute(
"""
UPDATE llm_configs lc
SET search_space_id = (
SELECT id
FROM searchspaces ss
WHERE ss.user_id = lc.user_id
ORDER BY ss.created_at ASC
LIMIT 1
# Only run this if user_id column exists on llm_configs
if "user_id" in llm_config_columns:
op.execute(
"""
UPDATE llm_configs lc
SET search_space_id = (
SELECT id
FROM searchspaces ss
WHERE ss.user_id = lc.user_id
ORDER BY ss.created_at ASC
LIMIT 1
)
WHERE search_space_id IS NULL AND user_id IS NOT NULL
"""
)
WHERE search_space_id IS NULL AND user_id IS NOT NULL
"""
)
# ===== STEP 3: Make search_space_id NOT NULL and add FK constraint =====
op.alter_column(
"llm_configs",
"search_space_id",
nullable=False,
# Check if there are any rows with NULL search_space_id
# If llm_configs table is empty or all rows have search_space_id, we can proceed
result = conn.execute(
sa.text("SELECT COUNT(*) FROM llm_configs WHERE search_space_id IS NULL")
)
null_count = result.scalar()
# Add foreign key constraint
if null_count == 0 or "user_id" in llm_config_columns:
# Safe to make NOT NULL
op.alter_column(
"llm_configs",
"search_space_id",
nullable=False,
)
else:
# If there are NULL values and no user_id to migrate from, skip making it NOT NULL
# This would happen if llm_configs already exists without user_id
pass
# Add foreign key constraint only if search_space_id is NOT NULL
foreign_keys = [fk["name"] for fk in inspector.get_foreign_keys("llm_configs")]
if "fk_llm_configs_search_space_id" not in foreign_keys:
if "fk_llm_configs_search_space_id" not in foreign_keys and null_count == 0:
op.create_foreign_key(
"fk_llm_configs_search_space_id",
"llm_configs",

View file

@ -0,0 +1,94 @@
"""Add periodic indexing fields to search_source_connectors
Revision ID: 32
Revises: 31
Changes:
1. Add periodic_indexing_enabled column (Boolean, default False)
2. Add indexing_frequency_minutes column (Integer, nullable)
3. Add next_scheduled_at column (TIMESTAMP with timezone, nullable)
"""
from collections.abc import Sequence
import sqlalchemy as sa
from alembic import op
# revision identifiers, used by Alembic.
revision: str = "32"
down_revision: str | None = "31"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
"""Add periodic indexing fields to search_source_connectors table."""
from sqlalchemy import inspect
conn = op.get_bind()
inspector = inspect(conn)
# Get existing columns
connector_columns = [
col["name"] for col in inspector.get_columns("search_source_connectors")
]
# Add periodic_indexing_enabled column if it doesn't exist
if "periodic_indexing_enabled" not in connector_columns:
op.add_column(
"search_source_connectors",
sa.Column(
"periodic_indexing_enabled",
sa.Boolean(),
nullable=False,
server_default="false",
),
)
# Add indexing_frequency_minutes column if it doesn't exist
if "indexing_frequency_minutes" not in connector_columns:
op.add_column(
"search_source_connectors",
sa.Column(
"indexing_frequency_minutes",
sa.Integer(),
nullable=True,
),
)
# Add next_scheduled_at column if it doesn't exist
if "next_scheduled_at" not in connector_columns:
op.add_column(
"search_source_connectors",
sa.Column(
"next_scheduled_at",
sa.TIMESTAMP(timezone=True),
nullable=True,
),
)
def downgrade() -> None:
"""Remove periodic indexing fields from search_source_connectors table."""
from sqlalchemy import inspect
conn = op.get_bind()
inspector = inspect(conn)
# Get existing columns
connector_columns = [
col["name"] for col in inspector.get_columns("search_source_connectors")
]
# Drop columns if they exist
if "next_scheduled_at" in connector_columns:
op.drop_column("search_source_connectors", "next_scheduled_at")
if "indexing_frequency_minutes" in connector_columns:
op.drop_column("search_source_connectors", "indexing_frequency_minutes")
if "periodic_indexing_enabled" in connector_columns:
op.drop_column("search_source_connectors", "periodic_indexing_enabled")

View file

@ -3,6 +3,7 @@
import os
from celery import Celery
from celery.schedules import crontab
from dotenv import load_dotenv
# Load environment variables
@ -12,6 +13,46 @@ load_dotenv()
CELERY_BROKER_URL = os.getenv("CELERY_BROKER_URL", "redis://localhost:6379/0")
CELERY_RESULT_BACKEND = os.getenv("CELERY_RESULT_BACKEND", "redis://localhost:6379/0")
# Get schedule checker interval from environment
# Format: "<number><unit>" where unit is 'm' (minutes) or 'h' (hours)
# Examples: "1m" (every minute), "5m" (every 5 minutes), "1h" (every hour)
SCHEDULE_CHECKER_INTERVAL = os.getenv("SCHEDULE_CHECKER_INTERVAL", "2m")
def parse_schedule_interval(interval: str) -> dict:
"""Parse interval string into crontab parameters.
Args:
interval: String like "1m", "5m", "1h", etc.
Returns:
Dict with crontab parameters (minute, hour)
"""
interval = interval.strip().lower()
# Extract number and unit
if interval.endswith("m") or interval.endswith("min"):
# Minutes
num = int(interval.rstrip("min"))
if num == 1:
return {"minute": "*", "hour": "*"}
else:
return {"minute": f"*/{num}", "hour": "*"}
elif interval.endswith("h") or interval.endswith("hour"):
# Hours
num = int(interval.rstrip("hour"))
if num == 1:
return {"minute": "0", "hour": "*"}
else:
return {"minute": "0", "hour": f"*/{num}"}
else:
# Default to every minute if parsing fails
return {"minute": "*", "hour": "*"}
# Parse the schedule interval
schedule_params = parse_schedule_interval(SCHEDULE_CHECKER_INTERVAL)
# Create Celery app
celery_app = Celery(
"surfsense",
@ -21,6 +62,7 @@ celery_app = Celery(
"app.tasks.celery_tasks.document_tasks",
"app.tasks.celery_tasks.podcast_tasks",
"app.tasks.celery_tasks.connector_tasks",
"app.tasks.celery_tasks.schedule_checker_task",
],
)
@ -47,13 +89,20 @@ celery_app.conf.update(
task_reject_on_worker_lost=True,
# Broker settings
broker_connection_retry_on_startup=True,
# Beat scheduler settings
beat_max_loop_interval=60, # Check every minute
)
# Optional: Configure Celery Beat for periodic tasks
# Configure Celery Beat schedule
# This uses a meta-scheduler pattern: instead of creating individual Beat schedules
# for each connector, we have ONE schedule that checks the database at the configured interval
# for connectors that need indexing. This provides dynamic scheduling without restarts.
celery_app.conf.beat_schedule = {
# Example: Add periodic tasks here if needed
# "periodic-task-name": {
# "task": "app.tasks.celery_tasks.some_task",
# "schedule": crontab(minute=0, hour=0), # Run daily at midnight
# },
"check-periodic-connector-schedules": {
"task": "check_periodic_schedules",
"schedule": crontab(**schedule_params),
"options": {
"expires": 30, # Task expires after 30 seconds if not picked up
},
},
}

View file

@ -285,6 +285,11 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
last_indexed_at = Column(TIMESTAMP(timezone=True), nullable=True)
config = Column(JSON, nullable=False)
# Periodic indexing fields
periodic_indexing_enabled = Column(Boolean, nullable=False, default=False)
indexing_frequency_minutes = Column(Integer, nullable=True)
next_scheduled_at = Column(TIMESTAMP(timezone=True), nullable=True)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)

View file

@ -11,7 +11,7 @@ Note: Each search space can have only one connector of each type per user (based
"""
import logging
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
@ -52,6 +52,11 @@ from app.tasks.connector_indexers import (
)
from app.users import current_active_user
from app.utils.check_ownership import check_ownership
from app.utils.periodic_scheduler import (
create_periodic_schedule,
delete_periodic_schedule,
update_periodic_schedule,
)
# Set up logging
logger = logging.getLogger(__name__)
@ -124,12 +129,44 @@ async def create_search_source_connector(
status_code=409,
detail=f"A connector with type {connector.connector_type} already exists in this search space. Each search space can have only one connector of each type per user.",
)
# Prepare connector data
connector_data = connector.model_dump()
# Automatically set next_scheduled_at if periodic indexing is enabled
if (
connector.periodic_indexing_enabled
and connector.indexing_frequency_minutes
and connector.next_scheduled_at is None
):
connector_data["next_scheduled_at"] = datetime.now(UTC) + timedelta(
minutes=connector.indexing_frequency_minutes
)
db_connector = SearchSourceConnector(
**connector.model_dump(), search_space_id=search_space_id, user_id=user.id
**connector_data, search_space_id=search_space_id, user_id=user.id
)
session.add(db_connector)
await session.commit()
await session.refresh(db_connector)
# Create periodic schedule if periodic indexing is enabled
if (
db_connector.periodic_indexing_enabled
and db_connector.indexing_frequency_minutes
):
success = create_periodic_schedule(
connector_id=db_connector.id,
search_space_id=search_space_id,
user_id=str(user.id),
connector_type=db_connector.connector_type,
frequency_minutes=db_connector.indexing_frequency_minutes,
)
if not success:
logger.warning(
f"Failed to create periodic schedule for connector {db_connector.id}"
)
return db_connector
except ValidationError as e:
await session.rollback()
@ -224,6 +261,50 @@ async def update_search_source_connector(
# Convert the sparse update data (only fields present in request) to a dict
update_data = connector_update.model_dump(exclude_unset=True)
# Validate periodic indexing fields
# Get the effective values after update
effective_is_indexable = update_data.get("is_indexable", db_connector.is_indexable)
effective_periodic_enabled = update_data.get(
"periodic_indexing_enabled", db_connector.periodic_indexing_enabled
)
effective_frequency = update_data.get(
"indexing_frequency_minutes", db_connector.indexing_frequency_minutes
)
# Validate periodic indexing configuration
if effective_periodic_enabled:
if not effective_is_indexable:
raise HTTPException(
status_code=422,
detail="periodic_indexing_enabled can only be True for indexable connectors",
)
if effective_frequency is None:
raise HTTPException(
status_code=422,
detail="indexing_frequency_minutes is required when periodic_indexing_enabled is True",
)
if effective_frequency <= 0:
raise HTTPException(
status_code=422,
detail="indexing_frequency_minutes must be greater than 0",
)
# Automatically set next_scheduled_at if not provided and periodic indexing is being enabled
if (
"periodic_indexing_enabled" in update_data
or "indexing_frequency_minutes" in update_data
) and "next_scheduled_at" not in update_data:
# Schedule the next indexing based on the frequency
update_data["next_scheduled_at"] = datetime.now(UTC) + timedelta(
minutes=effective_frequency
)
elif (
effective_periodic_enabled is False
and "periodic_indexing_enabled" in update_data
):
# If disabling periodic indexing, clear the next_scheduled_at
update_data["next_scheduled_at"] = None
# Special handling for 'config' field
if "config" in update_data:
incoming_config = update_data["config"] # Config data from the request
@ -290,6 +371,36 @@ async def update_search_source_connector(
try:
await session.commit()
await session.refresh(db_connector)
# Handle periodic schedule updates
if (
"periodic_indexing_enabled" in update_data
or "indexing_frequency_minutes" in update_data
):
if (
db_connector.periodic_indexing_enabled
and db_connector.indexing_frequency_minutes
):
# Create or update the periodic schedule
success = update_periodic_schedule(
connector_id=db_connector.id,
search_space_id=db_connector.search_space_id,
user_id=str(user.id),
connector_type=db_connector.connector_type,
frequency_minutes=db_connector.indexing_frequency_minutes,
)
if not success:
logger.warning(
f"Failed to update periodic schedule for connector {db_connector.id}"
)
else:
# Delete the periodic schedule if disabled
success = delete_periodic_schedule(db_connector.id)
if not success:
logger.warning(
f"Failed to delete periodic schedule for connector {db_connector.id}"
)
return db_connector
except IntegrityError as e:
await session.rollback()
@ -320,6 +431,15 @@ async def delete_search_source_connector(
db_connector = await check_ownership(
session, SearchSourceConnector, connector_id, user
)
# Delete any periodic schedule associated with this connector
if db_connector.periodic_indexing_enabled:
success = delete_periodic_schedule(connector_id)
if not success:
logger.warning(
f"Failed to delete periodic schedule for connector {connector_id}"
)
await session.delete(db_connector)
await session.commit()
return {"message": "Search source connector deleted successfully"}

View file

@ -2,7 +2,7 @@ import uuid
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict, field_validator
from pydantic import BaseModel, ConfigDict, field_validator, model_validator
from app.db import SearchSourceConnectorType
from app.utils.validators import validate_connector_config
@ -16,6 +16,9 @@ class SearchSourceConnectorBase(BaseModel):
is_indexable: bool
last_indexed_at: datetime | None = None
config: dict[str, Any]
periodic_indexing_enabled: bool = False
indexing_frequency_minutes: int | None = None
next_scheduled_at: datetime | None = None
@field_validator("config")
@classmethod
@ -25,6 +28,22 @@ class SearchSourceConnectorBase(BaseModel):
connector_type = values.data.get("connector_type")
return validate_connector_config(connector_type, config)
@model_validator(mode="after")
def validate_periodic_indexing(self):
"""Validate that periodic indexing configuration is consistent."""
if self.periodic_indexing_enabled:
if not self.is_indexable:
raise ValueError(
"periodic_indexing_enabled can only be True for indexable connectors"
)
if self.indexing_frequency_minutes is None:
raise ValueError(
"indexing_frequency_minutes is required when periodic_indexing_enabled is True"
)
if self.indexing_frequency_minutes <= 0:
raise ValueError("indexing_frequency_minutes must be greater than 0")
return self
class SearchSourceConnectorCreate(SearchSourceConnectorBase):
pass
@ -36,6 +55,9 @@ class SearchSourceConnectorUpdate(BaseModel):
is_indexable: bool | None = None
last_indexed_at: datetime | None = None
config: dict[str, Any] | None = None
periodic_indexing_enabled: bool | None = None
indexing_frequency_minutes: int | None = None
next_scheduled_at: datetime | None = None
class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampModel):

View file

@ -0,0 +1,129 @@
"""Meta-scheduler task that checks for connectors needing periodic indexing."""
import logging
from datetime import UTC, datetime
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
from sqlalchemy.future import select
from sqlalchemy.pool import NullPool
from app.celery_app import celery_app
from app.config import config
from app.db import SearchSourceConnector, SearchSourceConnectorType
logger = logging.getLogger(__name__)
def get_celery_session_maker():
"""Create async session maker for Celery tasks."""
engine = create_async_engine(
config.DATABASE_URL,
poolclass=NullPool,
echo=False,
)
return async_sessionmaker(engine, expire_on_commit=False)
@celery_app.task(name="check_periodic_schedules")
def check_periodic_schedules_task():
"""
Check all connectors for periodic indexing that's due.
This task runs every minute and triggers indexing for any connector
whose next_scheduled_at time has passed.
"""
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(_check_and_trigger_schedules())
finally:
loop.close()
async def _check_and_trigger_schedules():
"""Check database for connectors that need indexing and trigger their tasks."""
async with get_celery_session_maker()() as session:
try:
# Find all connectors with periodic indexing enabled that are due
now = datetime.now(UTC)
result = await session.execute(
select(SearchSourceConnector).filter(
SearchSourceConnector.periodic_indexing_enabled == True, # noqa: E712
SearchSourceConnector.next_scheduled_at <= now,
)
)
due_connectors = result.scalars().all()
if not due_connectors:
logger.debug("No connectors due for periodic indexing")
return
logger.info(f"Found {len(due_connectors)} connectors due for indexing")
# Import all indexing tasks
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_confluence_pages_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
index_luma_events_task,
index_notion_pages_task,
index_slack_messages_task,
)
# Map connector types to their tasks
task_map = {
SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task,
SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task,
SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task,
SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task,
SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task,
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task,
SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task,
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task,
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
}
# Trigger indexing for each due connector
for connector in due_connectors:
task = task_map.get(connector.connector_type)
if task:
logger.info(
f"Triggering periodic indexing for connector {connector.id} "
f"({connector.connector_type.value})"
)
task.delay(
connector.id,
connector.search_space_id,
str(connector.user_id),
None, # start_date - uses last_indexed_at
None, # end_date - uses now
)
# Update next_scheduled_at for next run
from datetime import timedelta
connector.next_scheduled_at = now + timedelta(
minutes=connector.indexing_frequency_minutes
)
await session.commit()
else:
logger.warning(
f"No task found for connector type {connector.connector_type}"
)
except Exception as e:
logger.error(f"Error checking periodic schedules: {e!s}", exc_info=True)
await session.rollback()

View file

@ -0,0 +1,171 @@
"""
Utility functions for managing periodic connector indexing schedules.
This module uses a meta-scheduler pattern instead of RedBeat's dynamic schedule creation.
Instead of creating individual Beat schedules for each connector, we:
1. Store schedule configuration in the database (next_scheduled_at, frequency)
2. Have ONE Beat task that runs every minute checking for due connectors
3. Trigger indexing tasks for connectors whose next_scheduled_at has passed
This avoids RedBeat's limitation where new schedules aren't discovered without restart.
"""
import logging
from app.db import SearchSourceConnectorType
logger = logging.getLogger(__name__)
# Mapping of connector types to their corresponding Celery task names
CONNECTOR_TASK_MAP = {
SearchSourceConnectorType.SLACK_CONNECTOR: "index_slack_messages",
SearchSourceConnectorType.NOTION_CONNECTOR: "index_notion_pages",
SearchSourceConnectorType.GITHUB_CONNECTOR: "index_github_repos",
SearchSourceConnectorType.LINEAR_CONNECTOR: "index_linear_issues",
SearchSourceConnectorType.JIRA_CONNECTOR: "index_jira_issues",
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: "index_confluence_pages",
SearchSourceConnectorType.CLICKUP_CONNECTOR: "index_clickup_tasks",
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: "index_google_calendar_events",
SearchSourceConnectorType.AIRTABLE_CONNECTOR: "index_airtable_records",
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: "index_google_gmail_messages",
SearchSourceConnectorType.DISCORD_CONNECTOR: "index_discord_messages",
SearchSourceConnectorType.LUMA_CONNECTOR: "index_luma_events",
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: "index_elasticsearch_documents",
}
def create_periodic_schedule(
connector_id: int,
search_space_id: int,
user_id: str,
connector_type: SearchSourceConnectorType,
frequency_minutes: int,
) -> bool:
"""
Trigger the first indexing run immediately when periodic indexing is enabled.
Note: The periodic schedule is managed by the database (next_scheduled_at field)
and checked by the meta-scheduler task that runs every minute.
This function just triggers the first run for immediate feedback.
Args:
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: User ID
connector_type: Type of connector
frequency_minutes: Frequency in minutes (used for logging)
Returns:
True if successful, False otherwise
"""
try:
logger.info(
f"Periodic indexing enabled for connector {connector_id} "
f"(frequency: {frequency_minutes} minutes). Triggering first run..."
)
# Import all indexing tasks
from app.tasks.celery_tasks.connector_tasks import (
index_airtable_records_task,
index_clickup_tasks_task,
index_confluence_pages_task,
index_discord_messages_task,
index_elasticsearch_documents_task,
index_github_repos_task,
index_google_calendar_events_task,
index_google_gmail_messages_task,
index_jira_issues_task,
index_linear_issues_task,
index_luma_events_task,
index_notion_pages_task,
index_slack_messages_task,
)
# Map connector type to task
task_map = {
SearchSourceConnectorType.SLACK_CONNECTOR: index_slack_messages_task,
SearchSourceConnectorType.NOTION_CONNECTOR: index_notion_pages_task,
SearchSourceConnectorType.GITHUB_CONNECTOR: index_github_repos_task,
SearchSourceConnectorType.LINEAR_CONNECTOR: index_linear_issues_task,
SearchSourceConnectorType.JIRA_CONNECTOR: index_jira_issues_task,
SearchSourceConnectorType.CONFLUENCE_CONNECTOR: index_confluence_pages_task,
SearchSourceConnectorType.CLICKUP_CONNECTOR: index_clickup_tasks_task,
SearchSourceConnectorType.GOOGLE_CALENDAR_CONNECTOR: index_google_calendar_events_task,
SearchSourceConnectorType.AIRTABLE_CONNECTOR: index_airtable_records_task,
SearchSourceConnectorType.GOOGLE_GMAIL_CONNECTOR: index_google_gmail_messages_task,
SearchSourceConnectorType.DISCORD_CONNECTOR: index_discord_messages_task,
SearchSourceConnectorType.LUMA_CONNECTOR: index_luma_events_task,
SearchSourceConnectorType.ELASTICSEARCH_CONNECTOR: index_elasticsearch_documents_task,
}
# Trigger the first run immediately
task = task_map.get(connector_type)
if task:
task.delay(connector_id, search_space_id, user_id, None, None)
logger.info(
f"✓ First indexing run triggered for connector {connector_id}. "
f"Periodic indexing will continue automatically every {frequency_minutes} minutes."
)
else:
logger.error(f"No task mapping found for connector type: {connector_type}")
return False
return True
except Exception as e:
logger.error(
f"Failed to trigger initial indexing for connector {connector_id}: {e!s}",
exc_info=True,
)
return False
def delete_periodic_schedule(connector_id: int) -> bool:
"""
Handle deletion of periodic schedule for a connector.
Note: With the meta-scheduler pattern, the schedule is managed in the database.
The next_scheduled_at field being set to None effectively disables it.
This function just logs the action.
Args:
connector_id: ID of the connector
Returns:
True (always successful since database handles the state)
"""
logger.info(f"Periodic indexing disabled for connector {connector_id}")
return True
def update_periodic_schedule(
connector_id: int,
search_space_id: int,
user_id: str,
connector_type: SearchSourceConnectorType,
frequency_minutes: int,
) -> bool:
"""
Update an existing periodic schedule for a connector.
Note: With the meta-scheduler pattern, updates are handled by the database.
This function logs the update and optionally triggers an immediate run.
Args:
connector_id: ID of the connector
search_space_id: ID of the search space
user_id: User ID
connector_type: Type of connector
frequency_minutes: New frequency in minutes
Returns:
True if successful, False otherwise
"""
logger.info(
f"Periodic indexing schedule updated for connector {connector_id} "
f"(new frequency: {frequency_minutes} minutes)"
)
# Optionally trigger an immediate run with the new schedule
# Uncomment the line below if you want immediate execution on schedule update
# return create_periodic_schedule(connector_id, search_space_id, user_id, connector_type, frequency_minutes)
return True

View file

@ -1,7 +1,15 @@
"use client";
import { format } from "date-fns";
import { Calendar as CalendarIcon, Edit, Plus, RefreshCw, Trash2 } from "lucide-react";
import {
Calendar as CalendarIcon,
Clock,
Edit,
Loader2,
Plus,
RefreshCw,
Trash2,
} from "lucide-react";
import { motion } from "motion/react";
import { useParams, useRouter } from "next/navigation";
import { useEffect, useState } from "react";
@ -28,8 +36,17 @@ import {
DialogHeader,
DialogTitle,
} from "@/components/ui/dialog";
import { Input } from "@/components/ui/input";
import { Label } from "@/components/ui/label";
import { Popover, PopoverContent, PopoverTrigger } from "@/components/ui/popover";
import {
Select,
SelectContent,
SelectItem,
SelectTrigger,
SelectValue,
} from "@/components/ui/select";
import { Switch } from "@/components/ui/switch";
import {
Table,
TableBody,
@ -64,7 +81,7 @@ export default function ConnectorsPage() {
const searchSpaceId = params.search_space_id as string;
const today = new Date();
const { connectors, isLoading, error, deleteConnector, indexConnector } =
const { connectors, isLoading, error, deleteConnector, indexConnector, updateConnector } =
useSearchSourceConnectors(false, parseInt(searchSpaceId));
const [connectorToDelete, setConnectorToDelete] = useState<number | null>(null);
const [indexingConnectorId, setIndexingConnectorId] = useState<number | null>(null);
@ -75,6 +92,16 @@ export default function ConnectorsPage() {
const [startDate, setStartDate] = useState<Date | undefined>(undefined);
const [endDate, setEndDate] = useState<Date | undefined>(undefined);
// Periodic indexing state
const [periodicDialogOpen, setPeriodicDialogOpen] = useState(false);
const [selectedConnectorForPeriodic, setSelectedConnectorForPeriodic] = useState<number | null>(
null
);
const [periodicEnabled, setPeriodicEnabled] = useState(false);
const [frequencyMinutes, setFrequencyMinutes] = useState<string>("1440");
const [customFrequency, setCustomFrequency] = useState<string>("");
const [isSavingPeriodic, setIsSavingPeriodic] = useState(false);
useEffect(() => {
if (error) {
toast.error("Failed to load connectors");
@ -141,6 +168,84 @@ export default function ConnectorsPage() {
}
};
// Handle opening periodic indexing dialog
const handleOpenPeriodicDialog = (connectorId: number) => {
const connector = connectors.find((c) => c.id === connectorId);
if (!connector) return;
setSelectedConnectorForPeriodic(connectorId);
setPeriodicEnabled(connector.periodic_indexing_enabled);
if (connector.indexing_frequency_minutes) {
// Check if it's a preset value
const presetValues = ["15", "60", "360", "720", "1440", "10080"];
if (presetValues.includes(connector.indexing_frequency_minutes.toString())) {
setFrequencyMinutes(connector.indexing_frequency_minutes.toString());
setCustomFrequency("");
} else {
setFrequencyMinutes("custom");
setCustomFrequency(connector.indexing_frequency_minutes.toString());
}
} else {
setFrequencyMinutes("1440");
setCustomFrequency("");
}
setPeriodicDialogOpen(true);
};
// Handle saving periodic indexing configuration
const handleSavePeriodicIndexing = async () => {
if (selectedConnectorForPeriodic === null) return;
const connector = connectors.find((c) => c.id === selectedConnectorForPeriodic);
if (!connector) return;
setIsSavingPeriodic(true);
try {
// Determine the frequency value
let frequency: number | null = null;
if (periodicEnabled) {
if (frequencyMinutes === "custom") {
frequency = parseInt(customFrequency, 10);
if (isNaN(frequency) || frequency <= 0) {
toast.error("Please enter a valid frequency in minutes");
setIsSavingPeriodic(false);
return;
}
} else {
frequency = parseInt(frequencyMinutes, 10);
}
}
await updateConnector(selectedConnectorForPeriodic, {
periodic_indexing_enabled: periodicEnabled,
indexing_frequency_minutes: frequency,
});
toast.success(
periodicEnabled
? "Periodic indexing enabled successfully"
: "Periodic indexing disabled successfully"
);
setPeriodicDialogOpen(false);
} catch (error) {
console.error("Error updating periodic indexing:", error);
toast.error(error instanceof Error ? error.message : "Failed to update periodic indexing");
} finally {
setIsSavingPeriodic(false);
setSelectedConnectorForPeriodic(null);
}
};
// Format frequency for display
const formatFrequency = (minutes: number): string => {
if (minutes < 60) return `${minutes}m`;
if (minutes < 1440) return `${Math.floor(minutes / 60)}h`;
if (minutes < 10080) return `${Math.floor(minutes / 1440)}d`;
return `${Math.floor(minutes / 10080)}w`;
};
return (
<div className="container mx-auto py-8 max-w-6xl">
<motion.div
@ -193,6 +298,7 @@ export default function ConnectorsPage() {
<TableHead>Name</TableHead>
<TableHead>Type</TableHead>
<TableHead>Last Indexed</TableHead>
<TableHead>Periodic</TableHead>
<TableHead className="text-right">Actions</TableHead>
</TableRow>
</TableHeader>
@ -206,6 +312,41 @@ export default function ConnectorsPage() {
? formatDateTime(connector.last_indexed_at)
: "Not indexable"}
</TableCell>
<TableCell>
{connector.is_indexable ? (
connector.periodic_indexing_enabled ? (
<TooltipProvider>
<Tooltip>
<TooltipTrigger asChild>
<div className="flex items-center gap-1 text-green-600 dark:text-green-400">
<Clock className="h-4 w-4" />
<span className="text-sm font-medium">
{connector.indexing_frequency_minutes
? formatFrequency(connector.indexing_frequency_minutes)
: "Enabled"}
</span>
</div>
</TooltipTrigger>
<TooltipContent>
<p>
Runs every {connector.indexing_frequency_minutes} minutes
{connector.next_scheduled_at && (
<>
<br />
Next: {formatDateTime(connector.next_scheduled_at)}
</>
)}
</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
) : (
<span className="text-sm text-muted-foreground">Disabled</span>
)
) : (
<span className="text-sm text-muted-foreground">-</span>
)}
</TableCell>
<TableCell className="text-right">
<div className="flex justify-end gap-2">
{connector.is_indexable && (
@ -256,6 +397,25 @@ export default function ConnectorsPage() {
</TooltipProvider>
</div>
)}
{connector.is_indexable && (
<TooltipProvider>
<Tooltip>
<TooltipTrigger asChild>
<Button
variant="outline"
size="sm"
onClick={() => handleOpenPeriodicDialog(connector.id)}
>
<Clock className="h-4 w-4" />
<span className="sr-only">Configure Periodic Indexing</span>
</Button>
</TooltipTrigger>
<TooltipContent>
<p>Configure Periodic Indexing</p>
</TooltipContent>
</Tooltip>
</TooltipProvider>
)}
<Button
variant="outline"
size="sm"
@ -424,6 +584,110 @@ export default function ConnectorsPage() {
</DialogFooter>
</DialogContent>
</Dialog>
{/* Periodic Indexing Configuration Dialog */}
<Dialog open={periodicDialogOpen} onOpenChange={setPeriodicDialogOpen}>
<DialogContent className="sm:max-w-[500px]">
<DialogHeader>
<DialogTitle>Configure Periodic Indexing</DialogTitle>
<DialogDescription>
Set up automatic indexing at regular intervals for this connector.
</DialogDescription>
</DialogHeader>
<div className="grid gap-6 py-4">
<div className="flex items-center justify-between space-x-2">
<div className="space-y-0.5">
<Label htmlFor="periodic-enabled" className="text-base">
Enable Periodic Indexing
</Label>
<p className="text-sm text-muted-foreground">
Automatically index this connector at regular intervals
</p>
</div>
<Switch
id="periodic-enabled"
checked={periodicEnabled}
onCheckedChange={setPeriodicEnabled}
/>
</div>
{periodicEnabled && (
<div className="space-y-4">
<div className="space-y-2">
<Label htmlFor="frequency">Indexing Frequency</Label>
<Select value={frequencyMinutes} onValueChange={setFrequencyMinutes}>
<SelectTrigger id="frequency">
<SelectValue placeholder="Select frequency" />
</SelectTrigger>
<SelectContent>
<SelectItem value="15">Every 15 minutes</SelectItem>
<SelectItem value="60">Every hour</SelectItem>
<SelectItem value="360">Every 6 hours</SelectItem>
<SelectItem value="720">Every 12 hours</SelectItem>
<SelectItem value="1440">Daily (24 hours)</SelectItem>
<SelectItem value="10080">Weekly (7 days)</SelectItem>
<SelectItem value="custom">Custom</SelectItem>
</SelectContent>
</Select>
</div>
{frequencyMinutes === "custom" && (
<div className="space-y-2">
<Label htmlFor="custom-frequency">Custom Frequency (minutes)</Label>
<Input
id="custom-frequency"
type="number"
min="1"
placeholder="Enter minutes"
value={customFrequency}
onChange={(e) => setCustomFrequency(e.target.value)}
/>
<p className="text-xs text-muted-foreground">
Enter the number of minutes between each indexing run
</p>
</div>
)}
<div className="rounded-lg bg-muted p-3 text-sm">
<p className="font-medium mb-1">Preview:</p>
<p className="text-muted-foreground">
{frequencyMinutes === "custom" && customFrequency
? `Will run every ${customFrequency} minutes`
: frequencyMinutes === "15"
? "Will run every 15 minutes"
: frequencyMinutes === "60"
? "Will run every hour"
: frequencyMinutes === "360"
? "Will run every 6 hours"
: frequencyMinutes === "720"
? "Will run every 12 hours"
: frequencyMinutes === "1440"
? "Will run daily (every 24 hours)"
: frequencyMinutes === "10080"
? "Will run weekly (every 7 days)"
: "Select a frequency above"}
</p>
</div>
</div>
)}
</div>
<DialogFooter>
<Button
variant="outline"
onClick={() => {
setPeriodicDialogOpen(false);
setSelectedConnectorForPeriodic(null);
}}
>
Cancel
</Button>
<Button onClick={handleSavePeriodicIndexing} disabled={isSavingPeriodic}>
{isSavingPeriodic && <Loader2 className="mr-2 h-4 w-4 animate-spin" />}
Save Configuration
</Button>
</DialogFooter>
</DialogContent>
</Dialog>
</div>
);
}

View file

@ -129,11 +129,18 @@ const MobileNav = ({ navItems, isScrolled }: any) => {
<Logo className="h-8 w-8 rounded-md" />
<span className="dark:text-white/90 text-gray-800 text-lg font-bold">SurfSense</span>
</div>
{open ? (
<IconX className="text-black dark:text-white" onClick={() => setOpen(!open)} />
) : (
<IconMenu2 className="text-black dark:text-white" onClick={() => setOpen(!open)} />
)}
<button
type="button"
onClick={() => setOpen(!open)}
className="relative z-50 flex items-center justify-center p-2 -mr-2 rounded-lg hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors touch-manipulation"
aria-label={open ? "Close menu" : "Open menu"}
>
{open ? (
<IconX className="h-6 w-6 text-black dark:text-white" />
) : (
<IconMenu2 className="h-6 w-6 text-black dark:text-white" />
)}
</button>
</div>
<AnimatePresence>
@ -155,10 +162,10 @@ const MobileNav = ({ navItems, isScrolled }: any) => {
))}
<div className="flex w-full items-center gap-2 pt-2">
<Link
href="https://discord.gg/your-server"
href="https://discord.gg/ejRNvftDp9"
target="_blank"
rel="noopener noreferrer"
className="flex items-center justify-center rounded-lg p-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors"
className="flex items-center justify-center rounded-lg p-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors touch-manipulation"
>
<IconBrandDiscord className="h-5 w-5 text-neutral-600 dark:text-neutral-300" />
</Link>
@ -166,7 +173,7 @@ const MobileNav = ({ navItems, isScrolled }: any) => {
href="https://github.com/MODSetter/SurfSense"
target="_blank"
rel="noopener noreferrer"
className="flex items-center gap-1.5 rounded-lg px-3 py-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors"
className="flex items-center gap-1.5 rounded-lg px-3 py-2 hover:bg-gray-100 dark:hover:bg-neutral-800 transition-colors touch-manipulation"
>
<IconBrandGithub className="h-5 w-5 text-neutral-600 dark:text-neutral-300" />
{loadingGithubStars ? (
@ -179,12 +186,12 @@ const MobileNav = ({ navItems, isScrolled }: any) => {
</Link>
<ThemeTogglerComponent />
</div>
<button
type="button"
className="w-full rounded-lg bg-black px-8 py-2 font-medium text-white shadow-[0px_-2px_0px_0px_rgba(255,255,255,0.4)_inset] dark:bg-white dark:text-black"
<Link
href="/contact"
className="w-full rounded-lg bg-black px-8 py-2 font-medium text-white shadow-[0px_-2px_0px_0px_rgba(255,255,255,0.4)_inset] dark:bg-white dark:text-black text-center touch-manipulation"
>
Book a call
</button>
</Link>
</motion.div>
)}
</AnimatePresence>

View file

@ -98,6 +98,8 @@ Before you begin, ensure you have:
| LLAMA_CLOUD_API_KEY | API key for LlamaCloud service for document parsing (required if ETL_SERVICE=LLAMACLOUD) |
| CELERY_BROKER_URL | Redis connection URL for Celery broker (e.g., `redis://localhost:6379/0`) |
| CELERY_RESULT_BACKEND | Redis connection URL for Celery result backend (e.g., `redis://localhost:6379/0`) |
| SCHEDULE_CHECKER_INTERVAL | (Optional) How often to check for scheduled connector tasks. Format: `<number><unit>` where unit is `m` (minutes) or `h` (hours). Examples: `1m`, `5m`, `1h`, `2h` (default: `1m`) |
| REGISTRATION_ENABLED | (Optional) Enable or disable new user registration (e.g., `TRUE` or `FALSE`, default: `TRUE`) |
**Optional Backend LangSmith Observability:**
@ -181,6 +183,22 @@ For more details, see the [Uvicorn documentation](https://www.uvicorn.org/#comma
- API Documentation: [http://localhost:8000/docs](http://localhost:8000/docs)
- pgAdmin: [http://localhost:5050](http://localhost:5050)
## Docker Services Overview
The Docker setup includes several services that work together:
- **Backend**: FastAPI application server
- **Frontend**: Next.js web application
- **PostgreSQL (db)**: Database with pgvector extension
- **Redis**: Message broker for Celery
- **Celery Worker**: Handles background tasks (document processing, indexing, etc.)
- **Celery Beat**: Scheduler for periodic tasks (enables scheduled connector indexing)
- The schedule interval can be configured using the `SCHEDULE_CHECKER_INTERVAL` environment variable in your backend `.env` file
- Default: checks every minute for connectors that need indexing
- **pgAdmin**: Database management interface
All services start automatically with `docker compose up`. The Celery Beat service ensures that periodic indexing functionality works out of the box.
## Using pgAdmin
pgAdmin is included in the Docker setup to help manage your PostgreSQL database. To connect:

View file

@ -86,6 +86,8 @@ Edit the `.env` file and set the following variables:
| LLAMA_CLOUD_API_KEY | API key for LlamaCloud service for document parsing (required if ETL_SERVICE=LLAMACLOUD) |
| CELERY_BROKER_URL | Redis connection URL for Celery broker (e.g., `redis://localhost:6379/0`) |
| CELERY_RESULT_BACKEND | Redis connection URL for Celery result backend (e.g., `redis://localhost:6379/0`) |
| SCHEDULE_CHECKER_INTERVAL | (Optional) How often to check for scheduled connector tasks. Format: `<number><unit>` where unit is `m` (minutes) or `h` (hours). Examples: `1m`, `5m`, `1h`, `2h` (default: `1m`) |
| REGISTRATION_ENABLED | (Optional) Enable or disable new user registration (e.g., `TRUE` or `FALSE`, default: `TRUE`) |
**(Optional) Backend LangSmith Observability:**
@ -249,7 +251,23 @@ uv run celery -A celery_worker.celery_app flower --port=5555
Access Flower at [http://localhost:5555](http://localhost:5555) to monitor your Celery tasks.
### 5. Run the Backend
### 5. Start Celery Beat (Scheduler)
In another new terminal window, start Celery Beat to enable periodic tasks (like scheduled connector indexing):
**Linux/macOS/Windows:**
```bash
# Make sure you're in the surfsense_backend directory
cd surfsense_backend
# Start Celery Beat
uv run celery -A celery_worker.celery_app beat --loglevel=info
```
**Important**: Celery Beat is required for the periodic indexing functionality to work. Without it, scheduled connector tasks won't run automatically. The schedule interval can be configured using the `SCHEDULE_CHECKER_INTERVAL` environment variable.
### 6. Run the Backend
Start the backend server:

View file

@ -10,6 +10,9 @@ export interface SearchSourceConnector {
search_space_id: number;
user_id?: string;
created_at?: string;
periodic_indexing_enabled: boolean;
indexing_frequency_minutes: number | null;
next_scheduled_at: string | null;
}
export interface ConnectorSourceItem {