test(etl-cache): cover store, service, and repository on real infra

This commit is contained in:
CREDO23 2026-06-12 11:50:57 +02:00
parent 3dec3231d0
commit c49a0f1233
4 changed files with 237 additions and 0 deletions

View file

@ -0,0 +1,32 @@
"""Real-infra fixtures for the parse-cache integration tests.
``cache_local_storage`` points the cache's blob store at a throwaway directory so
tests exercise the real ``LocalFileBackend`` (no cloud, no mocks). ``clean_cache_table``
removes rows written through the facade's own committing session, which the
savepoint-rolled-back ``db_session`` cannot undo.
"""
from __future__ import annotations
import pytest
import pytest_asyncio
from sqlalchemy import text
@pytest.fixture
def cache_local_storage(tmp_path, monkeypatch):
from app.config import config
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_BACKEND", "local")
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_LOCAL_PATH", str(tmp_path))
resolve_cache_backend.cache_clear()
yield tmp_path
resolve_cache_backend.cache_clear()
@pytest_asyncio.fixture
async def clean_cache_table(async_engine):
yield
async with async_engine.begin() as conn:
await conn.execute(text("DELETE FROM etl_cache_parses"))

View file

@ -0,0 +1,96 @@
"""CachedParseRepository against real Postgres: the SQL behind eviction & dedup.
These verify the parts that only a real database can: coldest-first ordering by
reuse then recency, TTL cutoff selection, the size accumulator, and the
insert-once guarantee under a duplicate key.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from app.etl_pipeline.cache.persistence import CachedParseRepository
from app.etl_pipeline.cache.schemas import ParseKey
pytestmark = pytest.mark.integration
def _key(sha: str) -> ParseKey:
return ParseKey.for_document(
sha, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def _insert(repo, *, sha, size=100, storage_key=None):
key = _key(sha)
await repo.insert(
key=key,
content_type="application/pdf",
actual_pages=1,
storage_backend="local",
storage_key=storage_key or f"etl_cache/{sha}.md",
size_bytes=size,
)
return key
async def test_total_size_bytes_sums_all_rows(db_session):
repo = CachedParseRepository(db_session)
await _insert(repo, sha="a" * 64, size=100)
await _insert(repo, sha="b" * 64, size=250)
assert await repo.total_size_bytes() == 350
async def test_select_coldest_orders_by_reuse_then_recency(db_session):
repo = CachedParseRepository(db_session)
ka = await _insert(repo, sha="a" * 64)
kb = await _insert(repo, sha="b" * 64)
kc = await _insert(repo, sha="c" * 64)
# Warm B once and C twice; A stays untouched and should be coldest.
await repo.mark_used((await repo.get(kb)).id)
await repo.mark_used((await repo.get(kc)).id)
await repo.mark_used((await repo.get(kc)).id)
coldest = await repo.select_coldest(limit=10)
ids_by_reuse = [c.id for c in coldest]
assert ids_by_reuse[:3] == [
(await repo.get(ka)).id,
(await repo.get(kb)).id,
(await repo.get(kc)).id,
]
async def test_select_expired_returns_only_rows_older_than_cutoff(db_session):
repo = CachedParseRepository(db_session)
await _insert(repo, sha="a" * 64)
future = datetime.now(UTC) + timedelta(days=1)
past = datetime.now(UTC) - timedelta(days=1)
# Row was just used, so it's older than a future cutoff but not a past one.
assert len(await repo.select_expired(cutoff=future, limit=10)) == 1
assert await repo.select_expired(cutoff=past, limit=10) == []
async def test_duplicate_key_insert_keeps_the_first_row(db_session):
repo = CachedParseRepository(db_session)
key = await _insert(repo, sha="a" * 64, size=100, storage_key="etl_cache/first.md")
# Same content-addressed key (a concurrent re-parse): must be a no-op.
await repo.insert(
key=key,
content_type="application/pdf",
actual_pages=1,
storage_backend="local",
storage_key="etl_cache/second.md",
size_bytes=999,
)
row = await repo.get(key)
assert row.storage_key == "etl_cache/first.md"
assert await repo.total_size_bytes() == 100

View file

@ -0,0 +1,67 @@
"""EtlCacheService end-to-end against real Postgres + real local storage.
Exercises the public cache surface -- ``recall`` / ``remember`` -- with no mocks:
a miss returns nothing, and a remembered parse comes back as an equivalent
``EtlResult`` rebuilt from the row and the blob.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.service import EtlCacheService
from app.etl_pipeline.etl_document import EtlResult
pytestmark = pytest.mark.integration
def _key(sha: str = "c" * 64) -> ParseKey:
return ParseKey.for_document(
sha, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def test_recall_is_a_miss_for_an_unknown_key(db_session, cache_local_storage):
service = EtlCacheService(db_session)
assert await service.recall(_key()) is None
async def test_remembered_parse_recalls_as_equivalent_result(
db_session, cache_local_storage
):
service = EtlCacheService(db_session)
stored = EtlResult(
markdown_content="# Cached doc\n\nBody paragraph.\n",
etl_service="LLAMACLOUD",
actual_pages=7,
content_type="application/pdf",
)
await service.remember(_key(), stored)
recalled = await service.recall(_key())
assert recalled is not None
assert recalled.markdown_content == stored.markdown_content
assert recalled.etl_service == "LLAMACLOUD"
assert recalled.actual_pages == 7
assert recalled.content_type == "application/pdf"
async def test_repeated_recall_keeps_serving_the_same_content(
db_session, cache_local_storage
):
service = EtlCacheService(db_session)
stored = EtlResult(
markdown_content="# Stable\n",
etl_service="LLAMACLOUD",
actual_pages=1,
content_type="application/pdf",
)
await service.remember(_key(), stored)
first = await service.recall(_key())
second = await service.recall(_key())
assert first is not None and second is not None
assert first.markdown_content == second.markdown_content == "# Stable\n"

View file

@ -0,0 +1,42 @@
"""MarkdownCacheStore against a real local filesystem backend (no mocks).
Proves the blob side of the cache: markdown written under a content-addressed key
comes back byte-for-byte, and a delete actually removes it.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.storage import MarkdownCacheStore
from app.etl_pipeline.cache.storage.object_keys import build_parse_object_key
pytestmark = pytest.mark.integration
def _key() -> ParseKey:
return ParseKey.for_document(
"d" * 64, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def test_save_then_load_round_trips_markdown(cache_local_storage):
store = MarkdownCacheStore()
markdown = "# Title\n\nBody with unicode: café, naïve, 漢字.\n"
storage_key = await store.save(_key(), markdown)
assert storage_key == build_parse_object_key(_key())
assert await store.load(storage_key) == markdown
async def test_delete_removes_the_blob(cache_local_storage):
store = MarkdownCacheStore()
storage_key = await store.save(_key(), "to be deleted")
await store.delete(storage_key)
# Eviction deleted the blob; a later read must fail rather than serve stale.
with pytest.raises(FileNotFoundError):
await store.load(storage_key)