feat(etl-cache): emit hit/miss and eviction metrics

This commit is contained in:
CREDO23 2026-06-12 11:57:03 +02:00
parent 9efe24879d
commit 0808fbcdee
2 changed files with 15 additions and 3 deletions

View file

@ -13,6 +13,7 @@ from app.etl_pipeline.cache.service import EtlCacheService
from app.etl_pipeline.cache.settings import load_etl_cache_settings
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.observability import metrics
logger = logging.getLogger(__name__)
@ -43,8 +44,15 @@ async def extract_with_cache(
cached_result = await _recall(key)
if cached_result is not None:
metrics.record_etl_cache_lookup(
etl_service=key.etl_service, mode=key.mode, outcome="hit"
)
logger.debug("ETL cache hit for %s", key.source_sha256)
return cached_result
metrics.record_etl_cache_lookup(
etl_service=key.etl_service, mode=key.mode, outcome="miss"
)
result = await EtlPipelineService(vision_llm=vision_llm).extract(request)
await _remember(key, result)
return result

View file

@ -12,6 +12,7 @@ from app.etl_pipeline.cache.persistence import CachedParseRepository
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.etl_pipeline.cache.settings import load_etl_cache_settings
from app.etl_pipeline.cache.storage import MarkdownCacheStore
from app.observability import metrics
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
logger = logging.getLogger(__name__)
@ -34,7 +35,7 @@ async def _evict() -> None:
cutoff = datetime.now(UTC) - timedelta(days=settings.ttl_days)
expired = await index.select_expired(cutoff=cutoff, limit=settings.eviction_batch)
await _drop(index, store, expired)
await _drop(index, store, expired, phase="ttl")
total = await index.total_size_bytes()
if total > settings.max_total_bytes:
@ -44,13 +45,15 @@ async def _evict() -> None:
current_total_bytes=total,
max_total_bytes=settings.max_total_bytes,
)
await _drop(index, store, over_budget)
await _drop(index, store, over_budget, phase="size")
async def _drop(
index: CachedParseRepository,
store: MarkdownCacheStore,
candidates: list[EvictionCandidate],
*,
phase: str,
) -> None:
if not candidates:
return
@ -59,4 +62,5 @@ async def _drop(
with contextlib.suppress(Exception):
await store.delete(candidate.storage_key)
await index.delete_by_ids([candidate.id for candidate in candidates])
logger.info("Evicted %d cached parses", len(candidates))
metrics.record_etl_cache_eviction(len(candidates), phase=phase)
logger.info("Evicted %d cached parses (%s)", len(candidates), phase)