feat: added periodic tasks in backend db and frontend hooks

- TODO: Add celery redbeat and create tasks dinamically in our redis
This commit is contained in:
DESKTOP-RTLN3BA\$punk 2025-10-22 16:14:25 -07:00
parent 70808eb08b
commit 182f815bb7
8 changed files with 484 additions and 31 deletions

View file

@ -285,6 +285,11 @@ class SearchSourceConnector(BaseModel, TimestampMixin):
last_indexed_at = Column(TIMESTAMP(timezone=True), nullable=True)
config = Column(JSON, nullable=False)
# Periodic indexing fields
periodic_indexing_enabled = Column(Boolean, nullable=False, default=False)
indexing_frequency_minutes = Column(Integer, nullable=True)
next_scheduled_at = Column(TIMESTAMP(timezone=True), nullable=True)
search_space_id = Column(
Integer, ForeignKey("searchspaces.id", ondelete="CASCADE"), nullable=False
)

View file

@ -11,7 +11,7 @@ Note: Each search space can have only one connector of each type per user (based
"""
import logging
from datetime import datetime, timedelta
from datetime import UTC, datetime, timedelta
from typing import Any
from fastapi import APIRouter, Depends, HTTPException, Query
@ -124,8 +124,22 @@ async def create_search_source_connector(
status_code=409,
detail=f"A connector with type {connector.connector_type} already exists in this search space. Each search space can have only one connector of each type per user.",
)
# Prepare connector data
connector_data = connector.model_dump()
# Automatically set next_scheduled_at if periodic indexing is enabled
if (
connector.periodic_indexing_enabled
and connector.indexing_frequency_minutes
and connector.next_scheduled_at is None
):
connector_data["next_scheduled_at"] = datetime.now(UTC) + timedelta(
minutes=connector.indexing_frequency_minutes
)
db_connector = SearchSourceConnector(
**connector.model_dump(), search_space_id=search_space_id, user_id=user.id
**connector_data, search_space_id=search_space_id, user_id=user.id
)
session.add(db_connector)
await session.commit()
@ -224,6 +238,50 @@ async def update_search_source_connector(
# Convert the sparse update data (only fields present in request) to a dict
update_data = connector_update.model_dump(exclude_unset=True)
# Validate periodic indexing fields
# Get the effective values after update
effective_is_indexable = update_data.get("is_indexable", db_connector.is_indexable)
effective_periodic_enabled = update_data.get(
"periodic_indexing_enabled", db_connector.periodic_indexing_enabled
)
effective_frequency = update_data.get(
"indexing_frequency_minutes", db_connector.indexing_frequency_minutes
)
# Validate periodic indexing configuration
if effective_periodic_enabled:
if not effective_is_indexable:
raise HTTPException(
status_code=422,
detail="periodic_indexing_enabled can only be True for indexable connectors",
)
if effective_frequency is None:
raise HTTPException(
status_code=422,
detail="indexing_frequency_minutes is required when periodic_indexing_enabled is True",
)
if effective_frequency <= 0:
raise HTTPException(
status_code=422,
detail="indexing_frequency_minutes must be greater than 0",
)
# Automatically set next_scheduled_at if not provided and periodic indexing is being enabled
if (
"periodic_indexing_enabled" in update_data
or "indexing_frequency_minutes" in update_data
) and "next_scheduled_at" not in update_data:
# Schedule the next indexing based on the frequency
update_data["next_scheduled_at"] = datetime.now(UTC) + timedelta(
minutes=effective_frequency
)
elif (
effective_periodic_enabled is False
and "periodic_indexing_enabled" in update_data
):
# If disabling periodic indexing, clear the next_scheduled_at
update_data["next_scheduled_at"] = None
# Special handling for 'config' field
if "config" in update_data:
incoming_config = update_data["config"] # Config data from the request

View file

@ -2,7 +2,7 @@ import uuid
from datetime import datetime
from typing import Any
from pydantic import BaseModel, ConfigDict, field_validator
from pydantic import BaseModel, ConfigDict, field_validator, model_validator
from app.db import SearchSourceConnectorType
from app.utils.validators import validate_connector_config
@ -16,6 +16,9 @@ class SearchSourceConnectorBase(BaseModel):
is_indexable: bool
last_indexed_at: datetime | None = None
config: dict[str, Any]
periodic_indexing_enabled: bool = False
indexing_frequency_minutes: int | None = None
next_scheduled_at: datetime | None = None
@field_validator("config")
@classmethod
@ -25,6 +28,22 @@ class SearchSourceConnectorBase(BaseModel):
connector_type = values.data.get("connector_type")
return validate_connector_config(connector_type, config)
@model_validator(mode="after")
def validate_periodic_indexing(self):
"""Validate that periodic indexing configuration is consistent."""
if self.periodic_indexing_enabled:
if not self.is_indexable:
raise ValueError(
"periodic_indexing_enabled can only be True for indexable connectors"
)
if self.indexing_frequency_minutes is None:
raise ValueError(
"indexing_frequency_minutes is required when periodic_indexing_enabled is True"
)
if self.indexing_frequency_minutes <= 0:
raise ValueError("indexing_frequency_minutes must be greater than 0")
return self
class SearchSourceConnectorCreate(SearchSourceConnectorBase):
pass
@ -36,6 +55,9 @@ class SearchSourceConnectorUpdate(BaseModel):
is_indexable: bool | None = None
last_indexed_at: datetime | None = None
config: dict[str, Any] | None = None
periodic_indexing_enabled: bool | None = None
indexing_frequency_minutes: int | None = None
next_scheduled_at: datetime | None = None
class SearchSourceConnectorRead(SearchSourceConnectorBase, IDModel, TimestampModel):