From 4e4f7f34faaa40102a8201a8e19c335c8b5dcd27 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 12 Jun 2026 16:48:18 +0200 Subject: [PATCH] feat(index-cache): add TTL/size eviction task and daily schedule --- surfsense_backend/app/celery_app.py | 7 ++ .../cache/eviction/__init__.py | 9 +++ .../indexing_pipeline/cache/eviction/task.py | 68 +++++++++++++++++++ 3 files changed, 84 insertions(+) create mode 100644 surfsense_backend/app/indexing_pipeline/cache/eviction/__init__.py create mode 100644 surfsense_backend/app/indexing_pipeline/cache/eviction/task.py diff --git a/surfsense_backend/app/celery_app.py b/surfsense_backend/app/celery_app.py index 413522189..38fb12a32 100644 --- a/surfsense_backend/app/celery_app.py +++ b/surfsense_backend/app/celery_app.py @@ -193,6 +193,7 @@ celery_app = Celery( "app.tasks.celery_tasks.auto_reload_task", "app.tasks.celery_tasks.gateway_tasks", "app.etl_pipeline.cache.eviction.task", + "app.indexing_pipeline.cache.eviction.task", "app.automations.tasks.execute_run", "app.automations.triggers.builtin.schedule.selector", "app.automations.triggers.builtin.event.selector", @@ -313,6 +314,12 @@ celery_app.conf.beat_schedule = { "schedule": crontab(hour="4", minute="0"), "options": {"expires": 600}, }, + # Prune the index cache (chunk+embedding sets) once daily, off-peak. + "evict-index-cache": { + "task": "evict_index_cache", + "schedule": crontab(hour="4", minute="30"), + "options": {"expires": 600}, + }, # Fire due automation schedule triggers (Beat entry owned by the schedule # trigger; see app.automations.triggers.builtin.schedule.source). **SCHEDULE_BEAT_SCHEDULE, diff --git a/surfsense_backend/app/indexing_pipeline/cache/eviction/__init__.py b/surfsense_backend/app/indexing_pipeline/cache/eviction/__init__.py new file mode 100644 index 000000000..de4df784e --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/cache/eviction/__init__.py @@ -0,0 +1,9 @@ +"""Background pruning of the index cache by age and size budget.""" + +from __future__ import annotations + +from .task import evict_index_cache_task + +__all__ = [ + "evict_index_cache_task", +] diff --git a/surfsense_backend/app/indexing_pipeline/cache/eviction/task.py b/surfsense_backend/app/indexing_pipeline/cache/eviction/task.py new file mode 100644 index 000000000..ab6885bca --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/cache/eviction/task.py @@ -0,0 +1,68 @@ +"""Celery task that prunes the index cache by TTL, then by size budget.""" + +from __future__ import annotations + +import contextlib +import logging +from datetime import UTC, datetime, timedelta + +from app.celery_app import celery_app +from app.etl_pipeline.cache.eviction.policy import select_over_budget +from app.etl_pipeline.cache.schemas import EvictionCandidate +from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository +from app.indexing_pipeline.cache.settings import load_index_cache_settings +from app.indexing_pipeline.cache.storage import EmbeddingCacheStore +from app.observability import metrics +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="evict_index_cache") +def evict_index_cache_task(): + return run_async_celery_task(_evict) + + +async def _evict() -> None: + """Expire stale entries, then shed the coldest overflow only if still over budget.""" + settings = load_index_cache_settings() + if not settings.enabled: + return + + store = EmbeddingCacheStore() + async with get_celery_session_maker()() as session: + index = CachedEmbeddingSetRepository(session) + + cutoff = datetime.now(UTC) - timedelta(days=settings.ttl_days) + expired = await index.select_expired( + cutoff=cutoff, limit=settings.eviction_batch + ) + await _drop(index, store, expired, phase="ttl") + + total = await index.total_size_bytes() + if total > settings.max_total_bytes: + coldest = await index.select_coldest(limit=settings.eviction_batch) + over_budget = select_over_budget( + coldest, + current_total_bytes=total, + max_total_bytes=settings.max_total_bytes, + ) + await _drop(index, store, over_budget, phase="size") + + +async def _drop( + index: CachedEmbeddingSetRepository, + store: EmbeddingCacheStore, + candidates: list[EvictionCandidate], + *, + phase: str, +) -> None: + if not candidates: + return + for candidate in candidates: + # Drop the index row even if the blob delete fails (orphan blob is harmless). + with contextlib.suppress(Exception): + await store.delete(candidate.storage_key) + await index.delete_by_ids([candidate.id for candidate in candidates]) + metrics.record_index_cache_eviction(len(candidates), phase=phase) + logger.info("Evicted %d cached embedding sets (%s)", len(candidates), phase)