feat(index-cache): add cached embedding set table and repository

This commit is contained in:
CREDO23 2026-06-12 16:48:01 +02:00
parent 59fa4c38c3
commit f541114544
5 changed files with 240 additions and 0 deletions

View file

@ -0,0 +1,53 @@
"""add index_cache_embedding_sets table for content-addressed embedding reuse
Revision ID: 161
Revises: 160
"""
from collections.abc import Sequence
from alembic import op
revision: str = "161"
down_revision: str | None = "160"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"""
CREATE TABLE IF NOT EXISTS index_cache_embedding_sets (
id SERIAL PRIMARY KEY,
markdown_sha256 VARCHAR(64) NOT NULL,
embedding_model VARCHAR(255) NOT NULL,
embedding_dim INTEGER NOT NULL,
chunker_kind VARCHAR(8) NOT NULL,
chunker_version INTEGER NOT NULL,
storage_backend VARCHAR(32) NOT NULL,
storage_key TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
chunk_count INTEGER NOT NULL DEFAULT 0,
times_reused BIGINT NOT NULL DEFAULT 0,
last_used_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_index_cache_embedding_sets_key
UNIQUE (markdown_sha256, embedding_model, chunker_kind, chunker_version)
);
"""
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_last_used_at "
"ON index_cache_embedding_sets(last_used_at);"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_index_cache_embedding_sets_created_at "
"ON index_cache_embedding_sets(created_at);"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_created_at;")
op.execute("DROP INDEX IF EXISTS ix_index_cache_embedding_sets_last_used_at;")
op.execute("DROP TABLE IF EXISTS index_cache_embedding_sets;")

View file

@ -2866,6 +2866,9 @@ from app.automations.persistence import ( # noqa: E402, F401
)
from app.etl_pipeline.cache.persistence.models import CachedParse # noqa: E402, F401
from app.file_storage.persistence import DocumentFile # noqa: E402, F401
from app.indexing_pipeline.cache.persistence.models import ( # noqa: E402, F401
CachedEmbeddingSet,
)
from app.notifications.persistence import Notification # noqa: E402, F401
from app.podcasts.persistence import ( # noqa: E402, F401
Podcast,

View file

@ -0,0 +1,11 @@
"""Database access for cached embedding sets."""
from __future__ import annotations
from .models import CachedEmbeddingSet
from .repository import CachedEmbeddingSetRepository
__all__ = [
"CachedEmbeddingSet",
"CachedEmbeddingSetRepository",
]

View file

@ -0,0 +1,47 @@
"""``index_cache_embedding_sets``: one reusable chunk+embedding set per markdown."""
from __future__ import annotations
from sqlalchemy import (
BigInteger,
Column,
DateTime,
Index,
Integer,
String,
UniqueConstraint,
)
from app.db import BaseModel, TimestampMixin
class CachedEmbeddingSet(BaseModel, TimestampMixin):
__tablename__ = "index_cache_embedding_sets"
# Key: markdown text + the recipe that turned it into vectors.
markdown_sha256 = Column(String(64), nullable=False)
embedding_model = Column(String(255), nullable=False)
embedding_dim = Column(Integer, nullable=False)
chunker_kind = Column(String(8), nullable=False)
chunker_version = Column(Integer, nullable=False)
# Where the embedding blob lives (kept out of the row to stay small).
storage_backend = Column(String(32), nullable=False)
storage_key = Column(String, nullable=False)
size_bytes = Column(BigInteger, nullable=False)
chunk_count = Column(Integer, nullable=False, default=0, server_default="0")
# Drives eviction (popularity + recency).
times_reused = Column(BigInteger, nullable=False, default=0, server_default="0")
last_used_at = Column(DateTime(timezone=True), nullable=False)
__table_args__ = (
UniqueConstraint(
"markdown_sha256",
"embedding_model",
"chunker_kind",
"chunker_version",
name="uq_index_cache_embedding_sets_key",
),
Index("ix_index_cache_embedding_sets_last_used_at", "last_used_at"),
)

View file

@ -0,0 +1,126 @@
"""CRUD and eviction selectors for ``index_cache_embedding_sets`` (no business rules)."""
from __future__ import annotations
from datetime import UTC, datetime
from sqlalchemy import delete, func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.indexing_pipeline.cache.schemas import EmbeddingKey
from .models import CachedEmbeddingSet
_EVICTION_COLUMNS = (
CachedEmbeddingSet.id,
CachedEmbeddingSet.storage_key,
CachedEmbeddingSet.size_bytes,
CachedEmbeddingSet.last_used_at,
CachedEmbeddingSet.times_reused,
)
def _as_eviction_candidate(row) -> EvictionCandidate:
return EvictionCandidate(
id=row.id,
storage_key=row.storage_key,
size_bytes=row.size_bytes,
last_used_at=row.last_used_at,
times_reused=row.times_reused,
)
class CachedEmbeddingSetRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get(self, key: EmbeddingKey) -> CachedEmbeddingSet | None:
result = await self._session.execute(
select(CachedEmbeddingSet).where(
CachedEmbeddingSet.markdown_sha256 == key.markdown_sha256,
CachedEmbeddingSet.embedding_model == key.embedding_model,
CachedEmbeddingSet.chunker_kind == key.chunker_kind,
CachedEmbeddingSet.chunker_version == key.chunker_version,
)
)
return result.scalars().first()
async def insert(
self,
*,
key: EmbeddingKey,
storage_backend: str,
storage_key: str,
size_bytes: int,
chunk_count: int,
) -> None:
# Concurrent writers embed identical markdown, so a lost race is harmless.
now = datetime.now(UTC)
await self._session.execute(
pg_insert(CachedEmbeddingSet)
.values(
markdown_sha256=key.markdown_sha256,
embedding_model=key.embedding_model,
embedding_dim=key.embedding_dim,
chunker_kind=key.chunker_kind,
chunker_version=key.chunker_version,
storage_backend=storage_backend,
storage_key=storage_key,
size_bytes=size_bytes,
chunk_count=chunk_count,
times_reused=0,
last_used_at=now,
created_at=now,
)
.on_conflict_do_nothing(constraint="uq_index_cache_embedding_sets_key")
)
await self._session.commit()
async def mark_used(self, row_id: int) -> None:
await self._session.execute(
update(CachedEmbeddingSet)
.where(CachedEmbeddingSet.id == row_id)
.values(
times_reused=CachedEmbeddingSet.times_reused + 1,
last_used_at=datetime.now(UTC),
)
)
await self._session.commit()
async def total_size_bytes(self) -> int:
result = await self._session.execute(
select(func.coalesce(func.sum(CachedEmbeddingSet.size_bytes), 0))
)
return int(result.scalar() or 0)
async def select_expired(
self, *, cutoff: datetime, limit: int
) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.where(CachedEmbeddingSet.last_used_at < cutoff)
.order_by(CachedEmbeddingSet.last_used_at.asc())
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def select_coldest(self, *, limit: int) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.order_by(
CachedEmbeddingSet.times_reused.asc(),
CachedEmbeddingSet.last_used_at.asc(),
)
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def delete_by_ids(self, ids: list[int]) -> None:
if not ids:
return
await self._session.execute(
delete(CachedEmbeddingSet).where(CachedEmbeddingSet.id.in_(ids))
)
await self._session.commit()