diff --git a/surfsense_backend/tests/integration/indexing_pipeline/cache/conftest.py b/surfsense_backend/tests/integration/indexing_pipeline/cache/conftest.py new file mode 100644 index 000000000..6acb457ee --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/cache/conftest.py @@ -0,0 +1,33 @@ +"""Real-infra fixtures for the embedding-cache integration tests. + +``cache_local_storage`` points the shared cache backend at a throwaway directory +so tests exercise the real ``LocalFileBackend`` (no cloud, no mocks); the +embedding cache reuses the ETL cache backend, hence the ``ETL_CACHE_STORAGE_*`` +knobs. ``clean_embedding_cache_table`` removes rows written through the store's +own committing session, which the savepoint-rolled-back ``db_session`` cannot undo. +""" + +from __future__ import annotations + +import pytest +import pytest_asyncio +from sqlalchemy import text + + +@pytest.fixture +def cache_local_storage(tmp_path, monkeypatch): + from app.config import config + from app.etl_pipeline.cache.storage.backend import resolve_cache_backend + + monkeypatch.setattr(config, "ETL_CACHE_STORAGE_BACKEND", "local") + monkeypatch.setattr(config, "ETL_CACHE_STORAGE_LOCAL_PATH", str(tmp_path)) + resolve_cache_backend.cache_clear() + yield tmp_path + resolve_cache_backend.cache_clear() + + +@pytest_asyncio.fixture +async def clean_embedding_cache_table(async_engine): + yield + async with async_engine.begin() as conn: + await conn.execute(text("DELETE FROM embedding_cache_sets")) diff --git a/surfsense_backend/tests/integration/indexing_pipeline/cache/test_cached_embedding_repository.py b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_cached_embedding_repository.py new file mode 100644 index 000000000..446932793 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_cached_embedding_repository.py @@ -0,0 +1,110 @@ +"""CachedEmbeddingSetRepository against real Postgres: the SQL behind eviction & dedup. + +These verify the parts only a real database can: the size accumulator, +coldest-first ordering by reuse then recency, TTL cutoff selection, the +insert-once guarantee under a duplicate key, and the reuse counter. +""" + +from __future__ import annotations + +from datetime import UTC, datetime, timedelta + +import pytest + +from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository +from app.indexing_pipeline.cache.schemas import EmbeddingKey + +pytestmark = pytest.mark.integration + + +def _key(sha: str) -> EmbeddingKey: + return EmbeddingKey( + markdown_sha256=sha, + embedding_model="test-model", + embedding_dim=4, + chunker_kind="hybrid", + chunker_version=1, + ) + + +async def _insert(repo, *, sha, size=100, storage_key=None, chunk_count=1): + key = _key(sha) + await repo.insert( + key=key, + storage_backend="local", + storage_key=storage_key or f"embedding_cache/{sha}.emb", + size_bytes=size, + chunk_count=chunk_count, + ) + return key + + +async def test_total_size_bytes_sums_all_rows(db_session): + repo = CachedEmbeddingSetRepository(db_session) + await _insert(repo, sha="a" * 64, size=100) + await _insert(repo, sha="b" * 64, size=250) + + assert await repo.total_size_bytes() == 350 + + +async def test_select_coldest_orders_by_reuse_then_recency(db_session): + repo = CachedEmbeddingSetRepository(db_session) + ka = await _insert(repo, sha="a" * 64) + kb = await _insert(repo, sha="b" * 64) + kc = await _insert(repo, sha="c" * 64) + + # Warm B once and C twice; A stays untouched and should be coldest. + await repo.mark_used((await repo.get(kb)).id) + await repo.mark_used((await repo.get(kc)).id) + await repo.mark_used((await repo.get(kc)).id) + + coldest = await repo.select_coldest(limit=10) + + assert [c.id for c in coldest][:3] == [ + (await repo.get(ka)).id, + (await repo.get(kb)).id, + (await repo.get(kc)).id, + ] + + +async def test_select_expired_returns_only_rows_older_than_cutoff(db_session): + repo = CachedEmbeddingSetRepository(db_session) + await _insert(repo, sha="a" * 64) + + future = datetime.now(UTC) + timedelta(days=1) + past = datetime.now(UTC) - timedelta(days=1) + + # Row was just used, so it predates a future cutoff but not a past one. + assert len(await repo.select_expired(cutoff=future, limit=10)) == 1 + assert await repo.select_expired(cutoff=past, limit=10) == [] + + +async def test_duplicate_key_insert_keeps_the_first_row(db_session): + repo = CachedEmbeddingSetRepository(db_session) + key = await _insert( + repo, sha="a" * 64, size=100, storage_key="embedding_cache/first.emb" + ) + + # Same content-addressed key (a concurrent re-embed): must be a no-op. + await repo.insert( + key=key, + storage_backend="local", + storage_key="embedding_cache/second.emb", + size_bytes=999, + chunk_count=42, + ) + + row = await repo.get(key) + assert row.storage_key == "embedding_cache/first.emb" + assert await repo.total_size_bytes() == 100 + + +async def test_mark_used_increments_reuse_count(db_session): + repo = CachedEmbeddingSetRepository(db_session) + key = await _insert(repo, sha="a" * 64) + assert (await repo.get(key)).times_reused == 0 + + await repo.mark_used((await repo.get(key)).id) + await repo.mark_used((await repo.get(key)).id) + + assert (await repo.get(key)).times_reused == 2 diff --git a/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_cache_service.py b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_cache_service.py new file mode 100644 index 000000000..2f4cd4a89 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_cache_service.py @@ -0,0 +1,70 @@ +"""EmbeddingCacheService end-to-end against real Postgres + real local storage. + +Exercises the public cache surface -- ``recall`` / ``remember`` -- with no mocks: +a miss returns nothing, a remembered set comes back as equivalent vectors, and a +dimension mismatch is refused rather than served. +""" + +from __future__ import annotations + +import numpy as np +import pytest + +from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet +from app.indexing_pipeline.cache.service import EmbeddingCacheService + +pytestmark = pytest.mark.integration + + +def _key(sha: str = "c" * 64, *, dim: int = 4) -> EmbeddingKey: + return EmbeddingKey( + markdown_sha256=sha, + embedding_model="test-model", + embedding_dim=dim, + chunker_kind="hybrid", + chunker_version=1, + ) + + +async def test_recall_is_a_miss_for_an_unknown_key(db_session, cache_local_storage): + service = EmbeddingCacheService(db_session) + assert await service.recall(_key()) is None + + +async def test_remembered_set_recalls_as_equivalent_vectors( + db_session, cache_local_storage, clean_embedding_cache_table +): + service = EmbeddingCacheService(db_session) + stored = EmbeddingSet( + summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32), + chunks=[ + CachedChunk("first chunk", np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32)), + CachedChunk("second chunk", np.array([0.0, 1.0, 0.0, 0.0], dtype=np.float32)), + ], + ) + + await service.remember(_key(), stored) + recalled = await service.recall(_key()) + + assert recalled is not None + assert np.array_equal(recalled.summary_embedding, stored.summary_embedding) + assert [c.text for c in recalled.chunks] == ["first chunk", "second chunk"] + assert np.array_equal(recalled.chunks[0].embedding, stored.chunks[0].embedding) + assert np.array_equal(recalled.chunks[1].embedding, stored.chunks[1].embedding) + + +async def test_recall_refuses_a_set_whose_dimension_changed( + db_session, cache_local_storage, clean_embedding_cache_table +): + # A model kept its name but changed its output width: never serve the stale blob. + service = EmbeddingCacheService(db_session) + stored = EmbeddingSet( + summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32), + chunks=[CachedChunk("c", np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32))], + ) + await service.remember(_key(dim=4), stored) + + # Same identity (model + chunker + markdown), but the caller now expects dim 8. + recalled = await service.recall(_key(dim=8)) + + assert recalled is None diff --git a/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_store.py b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_store.py new file mode 100644 index 000000000..83becd7b5 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/cache/test_embedding_store.py @@ -0,0 +1,63 @@ +"""EmbeddingCacheStore against a real local filesystem backend (no mocks). + +Proves the blob side of the cache: an embedding set written under a +content-addressed key comes back with identical vectors, and a delete actually +removes it. +""" + +from __future__ import annotations + +import numpy as np +import pytest + +from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet +from app.indexing_pipeline.cache.storage import EmbeddingCacheStore +from app.indexing_pipeline.cache.storage.object_keys import build_embedding_object_key + +pytestmark = pytest.mark.integration + + +def _key() -> EmbeddingKey: + return EmbeddingKey( + markdown_sha256="d" * 64, + embedding_model="test-model", + embedding_dim=4, + chunker_kind="hybrid", + chunker_version=1, + ) + + +def _set() -> EmbeddingSet: + return EmbeddingSet( + summary_embedding=np.array([0.5, 0.25, 0.125, 0.0625], dtype=np.float32), + chunks=[ + CachedChunk("café, naïve, 漢字", np.array([1, 2, 3, 4], dtype=np.float32)), + CachedChunk("second", np.array([5, 6, 7, 8], dtype=np.float32)), + ], + ) + + +async def test_save_then_load_round_trips_the_embedding_set(cache_local_storage): + store = EmbeddingCacheStore() + embedding_set = _set() + + storage_key, size_bytes = await store.save(_key(), embedding_set) + loaded = await store.load(storage_key) + + assert storage_key == build_embedding_object_key(_key()) + assert size_bytes > 0 + assert np.array_equal(loaded.summary_embedding, embedding_set.summary_embedding) + assert [c.text for c in loaded.chunks] == ["café, naïve, 漢字", "second"] + assert np.array_equal(loaded.chunks[0].embedding, embedding_set.chunks[0].embedding) + assert np.array_equal(loaded.chunks[1].embedding, embedding_set.chunks[1].embedding) + + +async def test_delete_removes_the_blob(cache_local_storage): + store = EmbeddingCacheStore() + storage_key, _ = await store.save(_key(), _set()) + + await store.delete(storage_key) + + # Eviction deleted the blob; a later read must fail rather than serve stale. + with pytest.raises(FileNotFoundError): + await store.load(storage_key)