refactor(embedding-cache): rename index cache to embedding cache

The cached payload is the indexing pipeline's embeddings (markdown is
chunked then embedded), so "embedding cache" names the expensive output
directly and removes the "index" ambiguity (DB index vs vector index vs
indexing phase). Renames the service, settings, eligibility, eviction
task, metrics, config flags (INDEX_CACHE_* -> EMBEDDING_CACHE_*), object
prefix, and the table (index_cache_embedding_sets -> embedding_cache_sets)
with its constraint and indexes. Migration 161 renamed accordingly.
This commit is contained in:
CREDO23 2026-06-12 17:00:01 +02:00
parent 8cf578d965
commit 91d947ff79
18 changed files with 93 additions and 89 deletions

View file

@ -328,19 +328,19 @@ ETL_CACHE_ENABLED=false
# ETL_CACHE_STORAGE_CONTAINER=surfsense-etl-cache
# ETL_CACHE_STORAGE_LOCAL_PATH=/var/lib/surfsense/etl-cache
# Index Cache
# Embedding Cache
# Reuse chunk+embedding output for identical markdown across workspaces (skips
# re-chunking and re-embedding). Blobs share the ETL_CACHE_STORAGE_* backend.
# Off by default.
INDEX_CACHE_ENABLED=false
EMBEDDING_CACHE_ENABLED=false
# Bump to invalidate all cached embedding sets after a chunker change.
# INDEX_CACHE_CHUNKER_VERSION=1
# EMBEDDING_CACHE_CHUNKER_VERSION=1
# Prune entries unused for this many days.
# INDEX_CACHE_TTL_DAYS=90
# EMBEDDING_CACHE_TTL_DAYS=90
# Soft cap on total cached embeddings; coldest entries are evicted past it.
# INDEX_CACHE_MAX_TOTAL_MB=5120
# EMBEDDING_CACHE_MAX_TOTAL_MB=5120
# Rows deleted per eviction pass.
# INDEX_CACHE_EVICTION_BATCH=500
# EMBEDDING_CACHE_EVICTION_BATCH=500
# Daytona Sandbox (isolated code execution)
# DAYTONA_SANDBOX_ENABLED=FALSE

View file

@ -1,4 +1,4 @@
"""add index_cache_embedding_sets table for content-addressed embedding reuse
"""add embedding_cache_sets table for content-addressed embedding reuse
Revision ID: 161
Revises: 160
@ -17,7 +17,7 @@ depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"""
CREATE TABLE IF NOT EXISTS index_cache_embedding_sets (
CREATE TABLE IF NOT EXISTS embedding_cache_sets (
id SERIAL PRIMARY KEY,
markdown_sha256 VARCHAR(64) NOT NULL,
embedding_model VARCHAR(255) NOT NULL,
@ -31,23 +31,23 @@ def upgrade() -> None:
times_reused BIGINT NOT NULL DEFAULT 0,
last_used_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_index_cache_embedding_sets_key
CONSTRAINT uq_embedding_cache_sets_key
UNIQUE (markdown_sha256, embedding_model, chunker_kind, chunker_version)
);
"""
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_last_used_at "
"ON index_cache_embedding_sets(last_used_at);"
"CREATE INDEX IF NOT EXISTS ix_embedding_cache_sets_last_used_at "
"ON embedding_cache_sets(last_used_at);"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_created_at "
"ON index_cache_embedding_sets(created_at);"
"CREATE INDEX IF NOT EXISTS ix_embedding_cache_sets_created_at "
"ON embedding_cache_sets(created_at);"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_created_at;")
op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_last_used_at;")
op.execute("DROP TABLE IF EXISTS index_cache_embedding_sets;")
op.execute("DROP INDEX IF EXISTS ix_embedding_cache_sets_created_at;")
op.execute("DROP INDEX IF EXISTS ix_embedding_cache_sets_last_used_at;")
op.execute("DROP TABLE IF EXISTS embedding_cache_sets;")

View file

@ -314,9 +314,9 @@ celery_app.conf.beat_schedule = {
"schedule": crontab(hour="4", minute="0"),
"options": {"expires": 600},
},
# Prune the index cache (chunk+embedding sets) once daily, off-peak.
"evict-index-cache": {
"task": "evict_index_cache",
# Prune the embedding cache (chunk+embedding sets) once daily, off-peak.
"evict-embedding-cache": {
"task": "evict_embedding_cache",
"schedule": crontab(hour="4", minute="30"),
"options": {"expires": 600},
},

View file

@ -964,16 +964,20 @@ class Config:
ETL_CACHE_STORAGE_CONTAINER = os.getenv("ETL_CACHE_STORAGE_CONTAINER")
ETL_CACHE_STORAGE_LOCAL_PATH = os.getenv("ETL_CACHE_STORAGE_LOCAL_PATH")
# Index cache: reuse chunk+embedding output for identical markdown across
# Embedding cache: reuse chunk+embedding output for identical markdown across
# workspaces. Blobs share the ETL_CACHE_STORAGE_* backend.
INDEX_CACHE_ENABLED = (
os.getenv("INDEX_CACHE_ENABLED", "false").strip().lower() == "true"
EMBEDDING_CACHE_ENABLED = (
os.getenv("EMBEDDING_CACHE_ENABLED", "false").strip().lower() == "true"
)
# Bump to invalidate every cached embedding set after a chunker change.
INDEX_CACHE_CHUNKER_VERSION = int(os.getenv("INDEX_CACHE_CHUNKER_VERSION", "1"))
INDEX_CACHE_TTL_DAYS = int(os.getenv("INDEX_CACHE_TTL_DAYS", "90"))
INDEX_CACHE_MAX_TOTAL_MB = int(os.getenv("INDEX_CACHE_MAX_TOTAL_MB", "5120"))
INDEX_CACHE_EVICTION_BATCH = int(os.getenv("INDEX_CACHE_EVICTION_BATCH", "500"))
EMBEDDING_CACHE_CHUNKER_VERSION = int(
os.getenv("EMBEDDING_CACHE_CHUNKER_VERSION", "1")
)
EMBEDDING_CACHE_TTL_DAYS = int(os.getenv("EMBEDDING_CACHE_TTL_DAYS", "90"))
EMBEDDING_CACHE_MAX_TOTAL_MB = int(os.getenv("EMBEDDING_CACHE_MAX_TOTAL_MB", "5120"))
EMBEDDING_CACHE_EVICTION_BATCH = int(
os.getenv("EMBEDDING_CACHE_EVICTION_BATCH", "500")
)
# Proxy provider selection. Maps to a ProxyProvider implementation registered
# in app/utils/proxy/registry.py. Add new vendors there and switch via this var.

View file

@ -3,9 +3,9 @@
from __future__ import annotations
from app.indexing_pipeline.cache.cached_indexing import build_chunk_embeddings
from app.indexing_pipeline.cache.service import IndexCacheService
from app.indexing_pipeline.cache.service import EmbeddingCacheService
__all__ = [
"IndexCacheService",
"EmbeddingCacheService",
"build_chunk_embeddings",
]

View file

@ -14,10 +14,10 @@ import logging
import numpy as np
from app.config import config
from app.indexing_pipeline.cache.eligibility import is_index_cacheable
from app.indexing_pipeline.cache.eligibility import is_embedding_cacheable
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.service import IndexCacheService
from app.indexing_pipeline.cache.settings import load_index_cache_settings
from app.indexing_pipeline.cache.service import EmbeddingCacheService
from app.indexing_pipeline.cache.settings import load_embedding_cache_settings
from app.indexing_pipeline.document_chunker import chunk_text, chunk_text_hybrid
from app.indexing_pipeline.document_embedder import embed_texts
from app.observability import metrics
@ -35,11 +35,11 @@ async def build_chunk_embeddings(
Drop-in for the inline chunk+embed step; reuses prior output when the same
markdown has already been embedded with the current model and chunker.
"""
settings = load_index_cache_settings()
settings = load_embedding_cache_settings()
chunker_kind = "code" if use_code_chunker else "hybrid"
embedding_dim = getattr(config.embedding_model_instance, "dimension", None)
cacheable = is_index_cacheable(
cacheable = is_embedding_cacheable(
cache_enabled=settings.enabled,
embedding_model=config.EMBEDDING_MODEL,
embedding_dim=embedding_dim,
@ -57,13 +57,13 @@ async def build_chunk_embeddings(
cached = await _recall(key)
if cached is not None:
metrics.record_index_cache_lookup(
metrics.record_embedding_cache_lookup(
embedding_model=key.embedding_model, chunker_kind=chunker_kind, outcome="hit"
)
logger.debug("Index cache hit for %s", key.markdown_sha256)
logger.debug("Embedding cache hit for %s", key.markdown_sha256)
return cached.summary_embedding, [(c.text, c.embedding) for c in cached.chunks]
metrics.record_index_cache_lookup(
metrics.record_embedding_cache_lookup(
embedding_model=key.embedding_model, chunker_kind=chunker_kind, outcome="miss"
)
summary_embedding, chunk_pairs = await _compute(
@ -95,9 +95,9 @@ async def _recall(key: EmbeddingKey) -> EmbeddingSet | None:
from app.tasks.celery_tasks import get_celery_session_maker
async with get_celery_session_maker()() as session:
return await IndexCacheService(session).recall(key)
return await EmbeddingCacheService(session).recall(key)
except Exception:
logger.warning("Index cache recall failed; embedding fresh", exc_info=True)
logger.warning("Embedding cache recall failed; embedding fresh", exc_info=True)
return None
@ -112,9 +112,9 @@ async def _remember(
chunks=[CachedChunk(text=text, embedding=vec) for text, vec in chunk_pairs],
)
async with get_celery_session_maker()() as session:
await IndexCacheService(session).remember(key, embedding_set)
await EmbeddingCacheService(session).remember(key, embedding_set)
except Exception:
logger.warning("Index cache write failed; result not cached", exc_info=True)
logger.warning("Embedding cache write failed; result not cached", exc_info=True)
def _hash_text(text: str) -> str:

View file

@ -1,9 +1,9 @@
"""Gating rule: may this document be served from / written to the index cache?"""
"""Gating rule: may this document be served from / written to the embedding cache?"""
from __future__ import annotations
def is_index_cacheable(
def is_embedding_cacheable(
*,
cache_enabled: bool,
embedding_model: str | None,

View file

@ -1,9 +1,9 @@
"""Background pruning of the index cache by age and size budget."""
"""Background pruning of the embedding cache by age and size budget."""
from __future__ import annotations
from .task import evict_index_cache_task
from .task import evict_embedding_cache_task
__all__ = [
"evict_index_cache_task",
"evict_embedding_cache_task",
]

View file

@ -1,4 +1,4 @@
"""Celery task that prunes the index cache by TTL, then by size budget."""
"""Celery task that prunes the embedding cache by TTL, then by size budget."""
from __future__ import annotations
@ -10,7 +10,7 @@ from app.celery_app import celery_app
from app.etl_pipeline.cache.eviction.policy import select_over_budget
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository
from app.indexing_pipeline.cache.settings import load_index_cache_settings
from app.indexing_pipeline.cache.settings import load_embedding_cache_settings
from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
from app.observability import metrics
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
@ -18,14 +18,14 @@ from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_ta
logger = logging.getLogger(__name__)
@celery_app.task(name="evict_index_cache")
def evict_index_cache_task():
@celery_app.task(name="evict_embedding_cache")
def evict_embedding_cache_task():
return run_async_celery_task(_evict)
async def _evict() -> None:
"""Expire stale entries, then shed the coldest overflow only if still over budget."""
settings = load_index_cache_settings()
settings = load_embedding_cache_settings()
if not settings.enabled:
return
@ -64,5 +64,5 @@ async def _drop(
with contextlib.suppress(Exception):
await store.delete(candidate.storage_key)
await index.delete_by_ids([candidate.id for candidate in candidates])
metrics.record_index_cache_eviction(len(candidates), phase=phase)
metrics.record_embedding_cache_eviction(len(candidates), phase=phase)
logger.info("Evicted %d cached embedding sets (%s)", len(candidates), phase)

View file

@ -1,4 +1,4 @@
"""``index_cache_embedding_sets``: one reusable chunk+embedding set per markdown."""
"""``embedding_cache_sets``: one reusable chunk+embedding set per markdown."""
from __future__ import annotations
@ -16,7 +16,7 @@ from app.db import BaseModel, TimestampMixin
class CachedEmbeddingSet(BaseModel, TimestampMixin):
__tablename__ = "index_cache_embedding_sets"
__tablename__ = "embedding_cache_sets"
# Key: markdown text + the recipe that turned it into vectors.
markdown_sha256 = Column(String(64), nullable=False)
@ -41,7 +41,7 @@ class CachedEmbeddingSet(BaseModel, TimestampMixin):
"embedding_model",
"chunker_kind",
"chunker_version",
name="uq_index_cache_embedding_sets_key",
name="uq_embedding_cache_sets_key",
),
Index("ix_index_cache_embedding_sets_last_used_at", "last_used_at"),
Index("ix_embedding_cache_sets_last_used_at", "last_used_at"),
)

View file

@ -1,4 +1,4 @@
"""CRUD and eviction selectors for ``index_cache_embedding_sets`` (no business rules)."""
"""CRUD and eviction selectors for ``embedding_cache_sets`` (no business rules)."""
from __future__ import annotations
@ -74,7 +74,7 @@ class CachedEmbeddingSetRepository:
last_used_at=now,
created_at=now,
)
.on_conflict_do_nothing(constraint="uq_index_cache_embedding_sets_key")
.on_conflict_do_nothing(constraint="uq_embedding_cache_sets_key")
)
await self._session.commit()

View file

@ -1,4 +1,4 @@
"""Pure value objects for the index cache."""
"""Pure value objects for the embedding cache."""
from __future__ import annotations

View file

@ -13,7 +13,7 @@ from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
logger = logging.getLogger(__name__)
class IndexCacheService:
class EmbeddingCacheService:
def __init__(self, session: AsyncSession) -> None:
self._index = CachedEmbeddingSetRepository(session)
self._store = EmbeddingCacheStore()

View file

@ -1,4 +1,4 @@
"""Index-cache configuration resolved from the central ``Config``.
"""Embedding-cache configuration resolved from the central ``Config``.
The blob backend is intentionally not configured here: it is shared with the ETL
parse cache (see ``ETL_CACHE_STORAGE_*``).
@ -10,7 +10,7 @@ from dataclasses import dataclass
@dataclass(frozen=True)
class IndexCacheSettings:
class EmbeddingCacheSettings:
enabled: bool
chunker_version: int
ttl_days: int
@ -18,13 +18,13 @@ class IndexCacheSettings:
eviction_batch: int
def load_index_cache_settings() -> IndexCacheSettings:
def load_embedding_cache_settings() -> EmbeddingCacheSettings:
from app.config import config
return IndexCacheSettings(
enabled=config.INDEX_CACHE_ENABLED,
chunker_version=config.INDEX_CACHE_CHUNKER_VERSION,
ttl_days=config.INDEX_CACHE_TTL_DAYS,
max_total_bytes=config.INDEX_CACHE_MAX_TOTAL_MB * 1024 * 1024,
eviction_batch=config.INDEX_CACHE_EVICTION_BATCH,
return EmbeddingCacheSettings(
enabled=config.EMBEDDING_CACHE_ENABLED,
chunker_version=config.EMBEDDING_CACHE_CHUNKER_VERSION,
ttl_days=config.EMBEDDING_CACHE_TTL_DAYS,
max_total_bytes=config.EMBEDDING_CACHE_MAX_TOTAL_MB * 1024 * 1024,
eviction_batch=config.EMBEDDING_CACHE_EVICTION_BATCH,
)

View file

@ -7,8 +7,8 @@ markdown and its embeddings live side by side; only the object prefix differs.
from __future__ import annotations
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
from app.indexing_pipeline.cache.serialization import deserialize, serialize
from app.indexing_pipeline.cache.schemas import EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.serialization import deserialize, serialize
from app.indexing_pipeline.cache.storage.object_keys import build_embedding_object_key
_EMBEDDING_CONTENT_TYPE = "application/octet-stream"

View file

@ -4,7 +4,7 @@ from __future__ import annotations
from app.indexing_pipeline.cache.schemas import EmbeddingKey
CACHE_PREFIX = "index_cache"
CACHE_PREFIX = "embedding_cache"
def build_embedding_object_key(key: EmbeddingKey) -> str:

View file

@ -306,18 +306,18 @@ def _etl_cache_evictions():
@lru_cache(maxsize=1)
def _index_cache_lookups():
def _embedding_cache_lookups():
return _get_meter().create_counter(
"surfsense.index.cache.lookups",
description="Count of index (chunk+embedding) cache lookups by outcome (hit/miss).",
"surfsense.embedding.cache.lookups",
description="Count of embedding (chunk+embedding) cache lookups by outcome (hit/miss).",
)
@lru_cache(maxsize=1)
def _index_cache_evictions():
def _embedding_cache_evictions():
return _get_meter().create_counter(
"surfsense.index.cache.evictions",
description="Count of index cache entries evicted, by phase.",
"surfsense.embedding.cache.evictions",
description="Count of embedding cache entries evicted, by phase.",
)
@ -724,12 +724,12 @@ def record_etl_cache_eviction(count: int, *, phase: str) -> None:
_add(_etl_cache_evictions(), count, {"phase": phase})
def record_index_cache_lookup(
def record_embedding_cache_lookup(
*, embedding_model: str | None, chunker_kind: str | None, outcome: str
) -> None:
"""Record an index-cache lookup. ``outcome`` is ``hit`` or ``miss``."""
"""Record an embedding-cache lookup. ``outcome`` is ``hit`` or ``miss``."""
_add(
_index_cache_lookups(),
_embedding_cache_lookups(),
1,
{
"embedding.model": embedding_model or "unknown",
@ -739,11 +739,11 @@ def record_index_cache_lookup(
)
def record_index_cache_eviction(count: int, *, phase: str) -> None:
def record_embedding_cache_eviction(count: int, *, phase: str) -> None:
"""Record evicted entries. ``phase`` is ``ttl`` or ``size``."""
if count <= 0:
return
_add(_index_cache_evictions(), count, {"phase": phase})
_add(_embedding_cache_evictions(), count, {"phase": phase})
def record_celery_heartbeat_refresh(*, heartbeat_type: str) -> None:
@ -942,12 +942,12 @@ __all__ = [
"record_compaction_run",
"record_connector_sync_duration",
"record_connector_sync_outcome",
"record_embedding_cache_eviction",
"record_embedding_cache_lookup",
"record_etl_cache_eviction",
"record_etl_cache_lookup",
"record_etl_extract_duration",
"record_etl_extract_outcome",
"record_index_cache_eviction",
"record_index_cache_lookup",
"record_indexing_document_duration",
"record_indexing_document_outcome",
"record_interrupt",

View file

@ -1,28 +1,28 @@
from app.indexing_pipeline.cache.eligibility import is_index_cacheable
from app.indexing_pipeline.cache.eligibility import is_embedding_cacheable
def test_disabled_cache_is_never_cacheable():
assert not is_index_cacheable(
assert not is_embedding_cacheable(
cache_enabled=False, embedding_model="m", embedding_dim=384
)
def test_missing_model_is_not_cacheable():
assert not is_index_cacheable(
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model=None, embedding_dim=384
)
def test_missing_dimension_is_not_cacheable():
assert not is_index_cacheable(
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=None
)
assert not is_index_cacheable(
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=0
)
def test_enabled_with_model_and_dim_is_cacheable():
assert is_index_cacheable(
assert is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=384
)