From f5411145446aa253cfe21802ce1666f484a0a006 Mon Sep 17 00:00:00 2001 From: CREDO23 Date: Fri, 12 Jun 2026 16:48:01 +0200 Subject: [PATCH] feat(index-cache): add cached embedding set table and repository --- .../161_add_index_cache_embedding_sets.py | 53 ++++++++ surfsense_backend/app/db.py | 3 + .../cache/persistence/__init__.py | 11 ++ .../cache/persistence/models.py | 47 +++++++ .../cache/persistence/repository.py | 126 ++++++++++++++++++ 5 files changed, 240 insertions(+) create mode 100644 surfsense_backend/alembic/versions/161_add_index_cache_embedding_sets.py create mode 100644 surfsense_backend/app/indexing_pipeline/cache/persistence/__init__.py create mode 100644 surfsense_backend/app/indexing_pipeline/cache/persistence/models.py create mode 100644 surfsense_backend/app/indexing_pipeline/cache/persistence/repository.py diff --git a/surfsense_backend/alembic/versions/161_add_index_cache_embedding_sets.py b/surfsense_backend/alembic/versions/161_add_index_cache_embedding_sets.py new file mode 100644 index 000000000..8441dcf6e --- /dev/null +++ b/surfsense_backend/alembic/versions/161_add_index_cache_embedding_sets.py @@ -0,0 +1,53 @@ +"""add index_cache_embedding_sets table for content-addressed embedding reuse + +Revision ID: 161 +Revises: 160 +""" + +from collections.abc import Sequence + +from alembic import op + +revision: str = "161" +down_revision: str | None = "160" +branch_labels: str | Sequence[str] | None = None +depends_on: str | Sequence[str] | None = None + + +def upgrade() -> None: + op.execute( + """ + CREATE TABLE IF NOT EXISTS index_cache_embedding_sets ( + id SERIAL PRIMARY KEY, + markdown_sha256 VARCHAR(64) NOT NULL, + embedding_model VARCHAR(255) NOT NULL, + embedding_dim INTEGER NOT NULL, + chunker_kind VARCHAR(8) NOT NULL, + chunker_version INTEGER NOT NULL, + storage_backend VARCHAR(32) NOT NULL, + storage_key TEXT NOT NULL, + size_bytes BIGINT NOT NULL, + chunk_count INTEGER NOT NULL DEFAULT 0, + times_reused BIGINT NOT NULL DEFAULT 0, + last_used_at TIMESTAMP WITH TIME ZONE NOT NULL, + created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(), + CONSTRAINT uq_index_cache_embedding_sets_key + UNIQUE (markdown_sha256, embedding_model, chunker_kind, chunker_version) + ); + """ + ) + + op.execute( + "CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_last_used_at " + "ON index_cache_embedding_sets(last_used_at);" + ) + op.execute( + "CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_created_at " + "ON index_cache_embedding_sets(created_at);" + ) + + +def downgrade() -> None: + op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_created_at;") + op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_last_used_at;") + op.execute("DROP TABLE IF EXISTS index_cache_embedding_sets;") diff --git a/surfsense_backend/app/db.py b/surfsense_backend/app/db.py index 97843d395..9ec13f4e2 100644 --- a/surfsense_backend/app/db.py +++ b/surfsense_backend/app/db.py @@ -2866,6 +2866,9 @@ from app.automations.persistence import ( # noqa: E402, F401 ) from app.etl_pipeline.cache.persistence.models import CachedParse # noqa: E402, F401 from app.file_storage.persistence import DocumentFile # noqa: E402, F401 +from app.indexing_pipeline.cache.persistence.models import ( # noqa: E402, F401 + CachedEmbeddingSet, +) from app.notifications.persistence import Notification # noqa: E402, F401 from app.podcasts.persistence import ( # noqa: E402, F401 Podcast, diff --git a/surfsense_backend/app/indexing_pipeline/cache/persistence/__init__.py b/surfsense_backend/app/indexing_pipeline/cache/persistence/__init__.py new file mode 100644 index 000000000..62cde0d05 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/cache/persistence/__init__.py @@ -0,0 +1,11 @@ +"""Database access for cached embedding sets.""" + +from __future__ import annotations + +from .models import CachedEmbeddingSet +from .repository import CachedEmbeddingSetRepository + +__all__ = [ + "CachedEmbeddingSet", + "CachedEmbeddingSetRepository", +] diff --git a/surfsense_backend/app/indexing_pipeline/cache/persistence/models.py b/surfsense_backend/app/indexing_pipeline/cache/persistence/models.py new file mode 100644 index 000000000..e33e470f0 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/cache/persistence/models.py @@ -0,0 +1,47 @@ +"""``index_cache_embedding_sets``: one reusable chunk+embedding set per markdown.""" + +from __future__ import annotations + +from sqlalchemy import ( + BigInteger, + Column, + DateTime, + Index, + Integer, + String, + UniqueConstraint, +) + +from app.db import BaseModel, TimestampMixin + + +class CachedEmbeddingSet(BaseModel, TimestampMixin): + __tablename__ = "index_cache_embedding_sets" + + # Key: markdown text + the recipe that turned it into vectors. + markdown_sha256 = Column(String(64), nullable=False) + embedding_model = Column(String(255), nullable=False) + embedding_dim = Column(Integer, nullable=False) + chunker_kind = Column(String(8), nullable=False) + chunker_version = Column(Integer, nullable=False) + + # Where the embedding blob lives (kept out of the row to stay small). + storage_backend = Column(String(32), nullable=False) + storage_key = Column(String, nullable=False) + size_bytes = Column(BigInteger, nullable=False) + chunk_count = Column(Integer, nullable=False, default=0, server_default="0") + + # Drives eviction (popularity + recency). + times_reused = Column(BigInteger, nullable=False, default=0, server_default="0") + last_used_at = Column(DateTime(timezone=True), nullable=False) + + __table_args__ = ( + UniqueConstraint( + "markdown_sha256", + "embedding_model", + "chunker_kind", + "chunker_version", + name="uq_index_cache_embedding_sets_key", + ), + Index("ix_index_cache_embedding_sets_last_used_at", "last_used_at"), + ) diff --git a/surfsense_backend/app/indexing_pipeline/cache/persistence/repository.py b/surfsense_backend/app/indexing_pipeline/cache/persistence/repository.py new file mode 100644 index 000000000..0bb0f8f23 --- /dev/null +++ b/surfsense_backend/app/indexing_pipeline/cache/persistence/repository.py @@ -0,0 +1,126 @@ +"""CRUD and eviction selectors for ``index_cache_embedding_sets`` (no business rules).""" + +from __future__ import annotations + +from datetime import UTC, datetime + +from sqlalchemy import delete, func, select, update +from sqlalchemy.dialects.postgresql import insert as pg_insert +from sqlalchemy.ext.asyncio import AsyncSession + +from app.etl_pipeline.cache.schemas import EvictionCandidate +from app.indexing_pipeline.cache.schemas import EmbeddingKey + +from .models import CachedEmbeddingSet + +_EVICTION_COLUMNS = ( + CachedEmbeddingSet.id, + CachedEmbeddingSet.storage_key, + CachedEmbeddingSet.size_bytes, + CachedEmbeddingSet.last_used_at, + CachedEmbeddingSet.times_reused, +) + + +def _as_eviction_candidate(row) -> EvictionCandidate: + return EvictionCandidate( + id=row.id, + storage_key=row.storage_key, + size_bytes=row.size_bytes, + last_used_at=row.last_used_at, + times_reused=row.times_reused, + ) + + +class CachedEmbeddingSetRepository: + def __init__(self, session: AsyncSession) -> None: + self._session = session + + async def get(self, key: EmbeddingKey) -> CachedEmbeddingSet | None: + result = await self._session.execute( + select(CachedEmbeddingSet).where( + CachedEmbeddingSet.markdown_sha256 == key.markdown_sha256, + CachedEmbeddingSet.embedding_model == key.embedding_model, + CachedEmbeddingSet.chunker_kind == key.chunker_kind, + CachedEmbeddingSet.chunker_version == key.chunker_version, + ) + ) + return result.scalars().first() + + async def insert( + self, + *, + key: EmbeddingKey, + storage_backend: str, + storage_key: str, + size_bytes: int, + chunk_count: int, + ) -> None: + # Concurrent writers embed identical markdown, so a lost race is harmless. + now = datetime.now(UTC) + await self._session.execute( + pg_insert(CachedEmbeddingSet) + .values( + markdown_sha256=key.markdown_sha256, + embedding_model=key.embedding_model, + embedding_dim=key.embedding_dim, + chunker_kind=key.chunker_kind, + chunker_version=key.chunker_version, + storage_backend=storage_backend, + storage_key=storage_key, + size_bytes=size_bytes, + chunk_count=chunk_count, + times_reused=0, + last_used_at=now, + created_at=now, + ) + .on_conflict_do_nothing(constraint="uq_index_cache_embedding_sets_key") + ) + await self._session.commit() + + async def mark_used(self, row_id: int) -> None: + await self._session.execute( + update(CachedEmbeddingSet) + .where(CachedEmbeddingSet.id == row_id) + .values( + times_reused=CachedEmbeddingSet.times_reused + 1, + last_used_at=datetime.now(UTC), + ) + ) + await self._session.commit() + + async def total_size_bytes(self) -> int: + result = await self._session.execute( + select(func.coalesce(func.sum(CachedEmbeddingSet.size_bytes), 0)) + ) + return int(result.scalar() or 0) + + async def select_expired( + self, *, cutoff: datetime, limit: int + ) -> list[EvictionCandidate]: + result = await self._session.execute( + select(*_EVICTION_COLUMNS) + .where(CachedEmbeddingSet.last_used_at < cutoff) + .order_by(CachedEmbeddingSet.last_used_at.asc()) + .limit(limit) + ) + return [_as_eviction_candidate(row) for row in result] + + async def select_coldest(self, *, limit: int) -> list[EvictionCandidate]: + result = await self._session.execute( + select(*_EVICTION_COLUMNS) + .order_by( + CachedEmbeddingSet.times_reused.asc(), + CachedEmbeddingSet.last_used_at.asc(), + ) + .limit(limit) + ) + return [_as_eviction_candidate(row) for row in result] + + async def delete_by_ids(self, ids: list[int]) -> None: + if not ids: + return + await self._session.execute( + delete(CachedEmbeddingSet).where(CachedEmbeddingSet.id.in_(ids)) + ) + await self._session.commit()