test(embedding-cache): add integration tests for service, repository, and store

Covers the public cache surface against real Postgres and a real local file
backend (no mocks): recall miss, remember->recall vector/text/order round-trip,
the dimension-mismatch refusal, the repository SQL behind eviction and dedup
(size sum, coldest ordering, TTL cutoff, duplicate-key no-op, reuse counter),
and the blob store save/load round-trip and delete.
This commit is contained in:
CREDO23 2026-06-12 17:33:21 +02:00
parent 91d947ff79
commit 412493ae08
4 changed files with 276 additions and 0 deletions

View file

@ -0,0 +1,33 @@
"""Real-infra fixtures for the embedding-cache integration tests.
``cache_local_storage`` points the shared cache backend at a throwaway directory
so tests exercise the real ``LocalFileBackend`` (no cloud, no mocks); the
embedding cache reuses the ETL cache backend, hence the ``ETL_CACHE_STORAGE_*``
knobs. ``clean_embedding_cache_table`` removes rows written through the store's
own committing session, which the savepoint-rolled-back ``db_session`` cannot undo.
"""
from __future__ import annotations
import pytest
import pytest_asyncio
from sqlalchemy import text
@pytest.fixture
def cache_local_storage(tmp_path, monkeypatch):
from app.config import config
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_BACKEND", "local")
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_LOCAL_PATH", str(tmp_path))
resolve_cache_backend.cache_clear()
yield tmp_path
resolve_cache_backend.cache_clear()
@pytest_asyncio.fixture
async def clean_embedding_cache_table(async_engine):
yield
async with async_engine.begin() as conn:
await conn.execute(text("DELETE FROM embedding_cache_sets"))

View file

@ -0,0 +1,110 @@
"""CachedEmbeddingSetRepository against real Postgres: the SQL behind eviction & dedup.
These verify the parts only a real database can: the size accumulator,
coldest-first ordering by reuse then recency, TTL cutoff selection, the
insert-once guarantee under a duplicate key, and the reuse counter.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository
from app.indexing_pipeline.cache.schemas import EmbeddingKey
pytestmark = pytest.mark.integration
def _key(sha: str) -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256=sha,
embedding_model="test-model",
embedding_dim=4,
chunker_kind="hybrid",
chunker_version=1,
)
async def _insert(repo, *, sha, size=100, storage_key=None, chunk_count=1):
key = _key(sha)
await repo.insert(
key=key,
storage_backend="local",
storage_key=storage_key or f"embedding_cache/{sha}.emb",
size_bytes=size,
chunk_count=chunk_count,
)
return key
async def test_total_size_bytes_sums_all_rows(db_session):
repo = CachedEmbeddingSetRepository(db_session)
await _insert(repo, sha="a" * 64, size=100)
await _insert(repo, sha="b" * 64, size=250)
assert await repo.total_size_bytes() == 350
async def test_select_coldest_orders_by_reuse_then_recency(db_session):
repo = CachedEmbeddingSetRepository(db_session)
ka = await _insert(repo, sha="a" * 64)
kb = await _insert(repo, sha="b" * 64)
kc = await _insert(repo, sha="c" * 64)
# Warm B once and C twice; A stays untouched and should be coldest.
await repo.mark_used((await repo.get(kb)).id)
await repo.mark_used((await repo.get(kc)).id)
await repo.mark_used((await repo.get(kc)).id)
coldest = await repo.select_coldest(limit=10)
assert [c.id for c in coldest][:3] == [
(await repo.get(ka)).id,
(await repo.get(kb)).id,
(await repo.get(kc)).id,
]
async def test_select_expired_returns_only_rows_older_than_cutoff(db_session):
repo = CachedEmbeddingSetRepository(db_session)
await _insert(repo, sha="a" * 64)
future = datetime.now(UTC) + timedelta(days=1)
past = datetime.now(UTC) - timedelta(days=1)
# Row was just used, so it predates a future cutoff but not a past one.
assert len(await repo.select_expired(cutoff=future, limit=10)) == 1
assert await repo.select_expired(cutoff=past, limit=10) == []
async def test_duplicate_key_insert_keeps_the_first_row(db_session):
repo = CachedEmbeddingSetRepository(db_session)
key = await _insert(
repo, sha="a" * 64, size=100, storage_key="embedding_cache/first.emb"
)
# Same content-addressed key (a concurrent re-embed): must be a no-op.
await repo.insert(
key=key,
storage_backend="local",
storage_key="embedding_cache/second.emb",
size_bytes=999,
chunk_count=42,
)
row = await repo.get(key)
assert row.storage_key == "embedding_cache/first.emb"
assert await repo.total_size_bytes() == 100
async def test_mark_used_increments_reuse_count(db_session):
repo = CachedEmbeddingSetRepository(db_session)
key = await _insert(repo, sha="a" * 64)
assert (await repo.get(key)).times_reused == 0
await repo.mark_used((await repo.get(key)).id)
await repo.mark_used((await repo.get(key)).id)
assert (await repo.get(key)).times_reused == 2

View file

@ -0,0 +1,70 @@
"""EmbeddingCacheService end-to-end against real Postgres + real local storage.
Exercises the public cache surface -- ``recall`` / ``remember`` -- with no mocks:
a miss returns nothing, a remembered set comes back as equivalent vectors, and a
dimension mismatch is refused rather than served.
"""
from __future__ import annotations
import numpy as np
import pytest
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.service import EmbeddingCacheService
pytestmark = pytest.mark.integration
def _key(sha: str = "c" * 64, *, dim: int = 4) -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256=sha,
embedding_model="test-model",
embedding_dim=dim,
chunker_kind="hybrid",
chunker_version=1,
)
async def test_recall_is_a_miss_for_an_unknown_key(db_session, cache_local_storage):
service = EmbeddingCacheService(db_session)
assert await service.recall(_key()) is None
async def test_remembered_set_recalls_as_equivalent_vectors(
db_session, cache_local_storage, clean_embedding_cache_table
):
service = EmbeddingCacheService(db_session)
stored = EmbeddingSet(
summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32),
chunks=[
CachedChunk("first chunk", np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32)),
CachedChunk("second chunk", np.array([0.0, 1.0, 0.0, 0.0], dtype=np.float32)),
],
)
await service.remember(_key(), stored)
recalled = await service.recall(_key())
assert recalled is not None
assert np.array_equal(recalled.summary_embedding, stored.summary_embedding)
assert [c.text for c in recalled.chunks] == ["first chunk", "second chunk"]
assert np.array_equal(recalled.chunks[0].embedding, stored.chunks[0].embedding)
assert np.array_equal(recalled.chunks[1].embedding, stored.chunks[1].embedding)
async def test_recall_refuses_a_set_whose_dimension_changed(
db_session, cache_local_storage, clean_embedding_cache_table
):
# A model kept its name but changed its output width: never serve the stale blob.
service = EmbeddingCacheService(db_session)
stored = EmbeddingSet(
summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32),
chunks=[CachedChunk("c", np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32))],
)
await service.remember(_key(dim=4), stored)
# Same identity (model + chunker + markdown), but the caller now expects dim 8.
recalled = await service.recall(_key(dim=8))
assert recalled is None

View file

@ -0,0 +1,63 @@
"""EmbeddingCacheStore against a real local filesystem backend (no mocks).
Proves the blob side of the cache: an embedding set written under a
content-addressed key comes back with identical vectors, and a delete actually
removes it.
"""
from __future__ import annotations
import numpy as np
import pytest
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
from app.indexing_pipeline.cache.storage.object_keys import build_embedding_object_key
pytestmark = pytest.mark.integration
def _key() -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256="d" * 64,
embedding_model="test-model",
embedding_dim=4,
chunker_kind="hybrid",
chunker_version=1,
)
def _set() -> EmbeddingSet:
return EmbeddingSet(
summary_embedding=np.array([0.5, 0.25, 0.125, 0.0625], dtype=np.float32),
chunks=[
CachedChunk("café, naïve, 漢字", np.array([1, 2, 3, 4], dtype=np.float32)),
CachedChunk("second", np.array([5, 6, 7, 8], dtype=np.float32)),
],
)
async def test_save_then_load_round_trips_the_embedding_set(cache_local_storage):
store = EmbeddingCacheStore()
embedding_set = _set()
storage_key, size_bytes = await store.save(_key(), embedding_set)
loaded = await store.load(storage_key)
assert storage_key == build_embedding_object_key(_key())
assert size_bytes > 0
assert np.array_equal(loaded.summary_embedding, embedding_set.summary_embedding)
assert [c.text for c in loaded.chunks] == ["café, naïve, 漢字", "second"]
assert np.array_equal(loaded.chunks[0].embedding, embedding_set.chunks[0].embedding)
assert np.array_equal(loaded.chunks[1].embedding, embedding_set.chunks[1].embedding)
async def test_delete_removes_the_blob(cache_local_storage):
store = EmbeddingCacheStore()
storage_key, _ = await store.save(_key(), _set())
await store.delete(storage_key)
# Eviction deleted the blob; a later read must fail rather than serve stale.
with pytest.raises(FileNotFoundError):
await store.load(storage_key)