From 324ba141a60648dc012f9edfd53be0eafaa9e2b4 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 12 Jun 2026 11:23:40 +0200 Subject: [PATCH] feat(etl-cache): add eviction task and public API --- .../app/etl_pipeline/cache/__init__.py | 11 ++++ .../etl_pipeline/cache/eviction/__init__.py | 9 +++ .../app/etl_pipeline/cache/eviction/task.py | 62 +++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 surfsense_backend/app/etl_pipeline/cache/__init__.py create mode 100644 surfsense_backend/app/etl_pipeline/cache/eviction/__init__.py create mode 100644 surfsense_backend/app/etl_pipeline/cache/eviction/task.py diff --git a/surfsense_backend/app/etl_pipeline/cache/__init__.py b/surfsense_backend/app/etl_pipeline/cache/__init__.py new file mode 100644 index 000000000..3f4585778 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/cache/__init__.py @@ -0,0 +1,11 @@ +"""Content-addressed reuse of expensive ETL parser output across workspaces.""" + +from __future__ import annotations + +from app.etl_pipeline.cache.cached_extraction import extract_with_cache +from app.etl_pipeline.cache.service import EtlCacheService + +__all__ = [ + "EtlCacheService", + "extract_with_cache", +] diff --git a/surfsense_backend/app/etl_pipeline/cache/eviction/__init__.py b/surfsense_backend/app/etl_pipeline/cache/eviction/__init__.py new file mode 100644 index 000000000..f47b9c4e0 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/cache/eviction/__init__.py @@ -0,0 +1,9 @@ +"""Background pruning of the parse cache by age and size budget.""" + +from __future__ import annotations + +from .task import evict_etl_cache_task + +__all__ = [ + "evict_etl_cache_task", +] diff --git a/surfsense_backend/app/etl_pipeline/cache/eviction/task.py b/surfsense_backend/app/etl_pipeline/cache/eviction/task.py new file mode 100644 index 000000000..98841b139 --- /dev/null +++ b/surfsense_backend/app/etl_pipeline/cache/eviction/task.py @@ -0,0 +1,62 @@ +"""Celery task that prunes the parse cache by TTL, then by size budget.""" + +from __future__ import annotations + +import contextlib +import logging +from datetime import UTC, datetime, timedelta + +from app.celery_app import celery_app +from app.etl_pipeline.cache.eviction.policy import select_over_budget +from app.etl_pipeline.cache.persistence import CachedParseRepository +from app.etl_pipeline.cache.schemas import EvictionCandidate +from app.etl_pipeline.cache.settings import load_etl_cache_settings +from app.etl_pipeline.cache.storage import MarkdownCacheStore +from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task + +logger = logging.getLogger(__name__) + + +@celery_app.task(name="evict_etl_cache") +def evict_etl_cache_task(): + return run_async_celery_task(_evict) + + +async def _evict() -> None: + """Expire stale entries, then shed the coldest overflow only if still over budget.""" + settings = load_etl_cache_settings() + if not settings.enabled: + return + + store = MarkdownCacheStore() + async with get_celery_session_maker()() as session: + index = CachedParseRepository(session) + + cutoff = datetime.now(UTC) - timedelta(days=settings.ttl_days) + expired = await index.select_expired(cutoff=cutoff, limit=settings.eviction_batch) + await _drop(index, store, expired) + + total = await index.total_size_bytes() + if total > settings.max_total_bytes: + coldest = await index.select_coldest(limit=settings.eviction_batch) + over_budget = select_over_budget( + coldest, + current_total_bytes=total, + max_total_bytes=settings.max_total_bytes, + ) + await _drop(index, store, over_budget) + + +async def _drop( + index: CachedParseRepository, + store: MarkdownCacheStore, + candidates: list[EvictionCandidate], +) -> None: + if not candidates: + return + for candidate in candidates: + # Drop the index row even if the blob delete fails (orphan blob is harmless). + with contextlib.suppress(Exception): + await store.delete(candidate.storage_key) + await index.delete_by_ids([candidate.id for candidate in candidates]) + logger.info("Evicted %d cached parses", len(candidates))