This commit is contained in:
Thierry CH. 2026-06-12 10:46:11 -07:00 committed by GitHub
commit 00dd9df44f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
92 changed files with 3392 additions and 109 deletions

View file

@ -311,6 +311,42 @@ FILE_STORAGE_BACKEND=local
# AZURE_STORAGE_CONNECTION_STRING=DefaultEndpointsProtocol=https;AccountName=...;AccountKey=...;EndpointSuffix=core.windows.net
# AZURE_STORAGE_CONTAINER=surfsense-documents
# ETL Parse Cache
# Reuse parser output for identical file bytes across workspaces (skips paid
# re-parsing on LlamaCloud / Azure DI / Unstructured). Off by default.
ETL_CACHE_ENABLED=false
# Bump to invalidate all cached entries after a parser/behaviour change.
# ETL_CACHE_PARSER_VERSION=1
# Prune entries unused for this many days.
# ETL_CACHE_TTL_DAYS=90
# Soft cap on total cached markdown; coldest entries are evicted past it.
# ETL_CACHE_MAX_TOTAL_MB=5120
# Rows deleted per eviction pass.
# ETL_CACHE_EVICTION_BATCH=500
# Optional dedicated blob storage; unset reuses the main file storage backend.
# ETL_CACHE_STORAGE_BACKEND=azure
# ETL_CACHE_STORAGE_CONTAINER=surfsense-etl-cache
# ETL_CACHE_STORAGE_LOCAL_PATH=/var/lib/surfsense/etl-cache
# Embedding Cache
# Reuse chunk+embedding output for identical markdown across workspaces (skips
# re-chunking and re-embedding). Blobs share the ETL_CACHE_STORAGE_* backend.
# Off by default.
EMBEDDING_CACHE_ENABLED=false
# Bump to invalidate all cached embedding sets after a chunker change.
# EMBEDDING_CACHE_CHUNKER_VERSION=1
# Prune entries unused for this many days.
# EMBEDDING_CACHE_TTL_DAYS=90
# Soft cap on total cached embeddings; coldest entries are evicted past it.
# EMBEDDING_CACHE_MAX_TOTAL_MB=5120
# Rows deleted per eviction pass.
# EMBEDDING_CACHE_EVICTION_BATCH=500
# Incremental re-indexing: on document edits, keep chunks whose text is
# unchanged (reusing their embeddings) and embed only new/changed ones.
# Set to false to fall back to delete-all + full re-embed (kill switch).
# CHUNK_RECONCILE_ENABLED=true
# Daytona Sandbox (isolated code execution)
# DAYTONA_SANDBOX_ENABLED=FALSE
# DAYTONA_API_KEY=your-daytona-api-key

View file

@ -0,0 +1,53 @@
"""add etl_cache_parses table for content-addressed parse reuse
Revision ID: 160
Revises: 159
"""
from collections.abc import Sequence
from alembic import op
revision: str = "160"
down_revision: str | None = "159"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"""
CREATE TABLE IF NOT EXISTS etl_cache_parses (
id SERIAL PRIMARY KEY,
source_sha256 VARCHAR(64) NOT NULL,
etl_service VARCHAR(32) NOT NULL,
mode VARCHAR(16) NOT NULL,
parser_version INTEGER NOT NULL,
storage_backend VARCHAR(32) NOT NULL,
storage_key TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
content_type VARCHAR(32) NOT NULL,
actual_pages INTEGER NOT NULL DEFAULT 0,
times_reused BIGINT NOT NULL DEFAULT 0,
last_used_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_etl_cache_parses_key
UNIQUE (source_sha256, etl_service, mode, parser_version)
);
"""
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_etl_cache_parses_last_used_at "
"ON etl_cache_parses(last_used_at);"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_etl_cache_parses_created_at "
"ON etl_cache_parses(created_at);"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_etl_cache_parses_created_at;")
op.execute("DROP INDEX IF EXISTS ix_etl_cache_parses_last_used_at;")
op.execute("DROP TABLE IF EXISTS etl_cache_parses;")

View file

@ -0,0 +1,53 @@
"""add embedding_cache_sets table for content-addressed embedding reuse
Revision ID: 161
Revises: 160
"""
from collections.abc import Sequence
from alembic import op
revision: str = "161"
down_revision: str | None = "160"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"""
CREATE TABLE IF NOT EXISTS embedding_cache_sets (
id SERIAL PRIMARY KEY,
markdown_sha256 VARCHAR(64) NOT NULL,
embedding_model VARCHAR(255) NOT NULL,
embedding_dim INTEGER NOT NULL,
chunker_kind VARCHAR(8) NOT NULL,
chunker_version INTEGER NOT NULL,
storage_backend VARCHAR(32) NOT NULL,
storage_key TEXT NOT NULL,
size_bytes BIGINT NOT NULL,
chunk_count INTEGER NOT NULL DEFAULT 0,
times_reused BIGINT NOT NULL DEFAULT 0,
last_used_at TIMESTAMP WITH TIME ZONE NOT NULL,
created_at TIMESTAMP WITH TIME ZONE NOT NULL DEFAULT NOW(),
CONSTRAINT uq_embedding_cache_sets_key
UNIQUE (markdown_sha256, embedding_model, chunker_kind, chunker_version)
);
"""
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_embedding_cache_sets_last_used_at "
"ON embedding_cache_sets(last_used_at);"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_embedding_cache_sets_created_at "
"ON embedding_cache_sets(created_at);"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_embedding_cache_sets_created_at;")
op.execute("DROP INDEX IF EXISTS ix_embedding_cache_sets_last_used_at;")
op.execute("DROP TABLE IF EXISTS embedding_cache_sets;")

View file

@ -0,0 +1,51 @@
"""add chunks.position for explicit document order
Incremental re-indexing keeps unchanged chunk rows, so auto-increment ids no
longer reflect document order. Backfill preserves the historical id ordering.
Revision ID: 162
Revises: 161
"""
from collections.abc import Sequence
from alembic import op
revision: str = "162"
down_revision: str | None = "161"
branch_labels: str | Sequence[str] | None = None
depends_on: str | Sequence[str] | None = None
def upgrade() -> None:
op.execute(
"ALTER TABLE chunks ADD COLUMN IF NOT EXISTS position INTEGER NOT NULL DEFAULT 0;"
)
# Backfill: document order so far has been the insertion order (id).
op.execute(
"""
UPDATE chunks
SET position = numbered.rn
FROM (
SELECT id,
ROW_NUMBER() OVER (PARTITION BY document_id ORDER BY id) - 1 AS rn
FROM chunks
) AS numbered
WHERE chunks.id = numbered.id;
"""
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_chunks_position ON chunks(position);"
)
op.execute(
"CREATE INDEX IF NOT EXISTS ix_chunks_document_id_position "
"ON chunks(document_id, position);"
)
def downgrade() -> None:
op.execute("DROP INDEX IF EXISTS ix_chunks_document_id_position;")
op.execute("DROP INDEX IF EXISTS ix_chunks_position;")
op.execute("ALTER TABLE chunks DROP COLUMN IF EXISTS position;")

View file

@ -241,8 +241,15 @@ async def _create_document(
chunk_embeddings = await asyncio.to_thread(embed_texts, chunks)
session.add_all(
[
Chunk(document_id=doc.id, content=text, embedding=embedding)
for text, embedding in zip(chunks, chunk_embeddings, strict=True)
Chunk(
document_id=doc.id,
content=text,
embedding=embedding,
position=i,
)
for i, (text, embedding) in enumerate(
zip(chunks, chunk_embeddings, strict=True)
)
]
)
return doc
@ -289,8 +296,15 @@ async def _update_document(
chunk_embeddings = await asyncio.to_thread(embed_texts, chunks)
session.add_all(
[
Chunk(document_id=document.id, content=text, embedding=embedding)
for text, embedding in zip(chunks, chunk_embeddings, strict=True)
Chunk(
document_id=document.id,
content=text,
embedding=embedding,
position=i,
)
for i, (text, embedding) in enumerate(
zip(chunks, chunk_embeddings, strict=True)
)
]
)
return document
@ -475,7 +489,9 @@ async def _load_chunks_for_snapshot(
session: AsyncSession, *, doc_id: int
) -> list[dict[str, str]]:
rows = await session.execute(
select(Chunk.content).where(Chunk.document_id == doc_id).order_by(Chunk.id)
select(Chunk.content)
.where(Chunk.document_id == doc_id)
.order_by(Chunk.position, Chunk.id)
)
return [{"content": row.content} for row in rows.all() if row.content is not None]

View file

@ -508,7 +508,7 @@ class KBPostgresBackend(BackendProtocol):
chunk_rows = await session.execute(
select(Chunk.id, Chunk.content)
.where(Chunk.document_id == document.id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
)
chunks = [
{"chunk_id": row.id, "content": row.content} for row in chunk_rows.all()
@ -725,7 +725,7 @@ class KBPostgresBackend(BackendProtocol):
.join(Document, Document.id == Chunk.document_id)
.where(Document.search_space_id == self.search_space_id)
.where(Chunk.content.ilike(f"%{pattern}%"))
.order_by(Chunk.document_id, Chunk.id)
.order_by(Chunk.document_id, Chunk.position, Chunk.id)
)
chunk_rows = await session.execute(sub)
per_doc: dict[int, int] = {}

View file

@ -394,7 +394,10 @@ async def browse_recent_documents(
Chunk.document_id,
Chunk.content,
func.row_number()
.over(partition_by=Chunk.document_id, order_by=Chunk.id)
.over(
partition_by=Chunk.document_id,
order_by=(Chunk.position, Chunk.id),
)
.label("rn"),
)
.where(Chunk.document_id.in_(doc_ids))
@ -404,7 +407,7 @@ async def browse_recent_documents(
chunk_query = (
select(numbered.c.chunk_id, numbered.c.document_id, numbered.c.content)
.where(numbered.c.rn <= _RECENCY_MAX_CHUNKS_PER_DOC)
.order_by(numbered.c.document_id, numbered.c.chunk_id)
.order_by(numbered.c.document_id, numbered.c.rn)
)
chunk_result = await session.execute(chunk_query)
fetched_chunks = chunk_result.all()
@ -531,7 +534,7 @@ async def fetch_mentioned_documents(
chunk_result = await session.execute(
select(Chunk.id, Chunk.content, Chunk.document_id)
.where(Chunk.document_id.in_(list(docs.keys())))
.order_by(Chunk.document_id, Chunk.id)
.order_by(Chunk.document_id, Chunk.position, Chunk.id)
)
chunks_by_doc: dict[int, list[dict[str, Any]]] = {doc_id: [] for doc_id in docs}
for row in chunk_result.all():

View file

@ -122,7 +122,7 @@ async def _browse_recent_documents(
chunk_query = (
select(Chunk)
.where(Chunk.document_id.in_(doc_ids))
.order_by(Chunk.document_id, Chunk.id)
.order_by(Chunk.document_id, Chunk.position, Chunk.id)
)
chunk_result = await session.execute(chunk_query)
raw_chunks = chunk_result.scalars().all()

View file

@ -192,6 +192,8 @@ celery_app = Celery(
"app.tasks.celery_tasks.stripe_reconciliation_task",
"app.tasks.celery_tasks.auto_reload_task",
"app.tasks.celery_tasks.gateway_tasks",
"app.etl_pipeline.cache.eviction.task",
"app.indexing_pipeline.cache.eviction.task",
"app.automations.tasks.execute_run",
"app.automations.triggers.builtin.schedule.selector",
"app.automations.triggers.builtin.event.selector",
@ -306,6 +308,18 @@ celery_app.conf.beat_schedule = {
"schedule": crontab(hour="3", minute="17"),
"options": {"expires": 600},
},
# Prune the ETL parse cache (TTL + size budget) once daily, off-peak.
"evict-etl-cache": {
"task": "evict_etl_cache",
"schedule": crontab(hour="4", minute="0"),
"options": {"expires": 600},
},
# Prune the embedding cache (chunk+embedding sets) once daily, off-peak.
"evict-embedding-cache": {
"task": "evict_embedding_cache",
"schedule": crontab(hour="4", minute="30"),
"options": {"expires": 600},
},
# Fire due automation schedule triggers (Beat entry owned by the schedule
# trigger; see app.automations.triggers.builtin.schedule.source).
**SCHEDULE_BEAT_SCHEDULE,

View file

@ -952,6 +952,40 @@ class Config:
AZURE_DI_ENDPOINT = os.getenv("AZURE_DI_ENDPOINT")
AZURE_DI_KEY = os.getenv("AZURE_DI_KEY")
# ETL parse cache: reuse parser output for identical bytes across workspaces.
ETL_CACHE_ENABLED = os.getenv("ETL_CACHE_ENABLED", "false").strip().lower() == "true"
# Bump to invalidate every cached entry after a parser/behaviour change.
ETL_CACHE_PARSER_VERSION = int(os.getenv("ETL_CACHE_PARSER_VERSION", "1"))
ETL_CACHE_TTL_DAYS = int(os.getenv("ETL_CACHE_TTL_DAYS", "90"))
ETL_CACHE_MAX_TOTAL_MB = int(os.getenv("ETL_CACHE_MAX_TOTAL_MB", "5120"))
ETL_CACHE_EVICTION_BATCH = int(os.getenv("ETL_CACHE_EVICTION_BATCH", "500"))
# Optional dedicated blob storage; unset reuses the main file_storage backend.
ETL_CACHE_STORAGE_BACKEND = os.getenv("ETL_CACHE_STORAGE_BACKEND")
ETL_CACHE_STORAGE_CONTAINER = os.getenv("ETL_CACHE_STORAGE_CONTAINER")
ETL_CACHE_STORAGE_LOCAL_PATH = os.getenv("ETL_CACHE_STORAGE_LOCAL_PATH")
# Embedding cache: reuse chunk+embedding output for identical markdown across
# workspaces. Blobs share the ETL_CACHE_STORAGE_* backend.
EMBEDDING_CACHE_ENABLED = (
os.getenv("EMBEDDING_CACHE_ENABLED", "false").strip().lower() == "true"
)
# Bump to invalidate every cached embedding set after a chunker change.
EMBEDDING_CACHE_CHUNKER_VERSION = int(
os.getenv("EMBEDDING_CACHE_CHUNKER_VERSION", "1")
)
EMBEDDING_CACHE_TTL_DAYS = int(os.getenv("EMBEDDING_CACHE_TTL_DAYS", "90"))
EMBEDDING_CACHE_MAX_TOTAL_MB = int(os.getenv("EMBEDDING_CACHE_MAX_TOTAL_MB", "5120"))
EMBEDDING_CACHE_EVICTION_BATCH = int(
os.getenv("EMBEDDING_CACHE_EVICTION_BATCH", "500")
)
# Incremental re-indexing: on document edits, keep chunk rows whose text is
# unchanged (reusing their embeddings) and embed only new/changed chunks.
# Kill switch -- disabling falls back to delete-all + full re-embed.
CHUNK_RECONCILE_ENABLED = (
os.getenv("CHUNK_RECONCILE_ENABLED", "true").strip().lower() == "true"
)
# Proxy provider selection. Maps to a ProxyProvider implementation registered
# in app/utils/proxy/registry.py. Add new vendors there and switch via this var.
PROXY_PROVIDER = os.getenv("PROXY_PROVIDER", "anonymous_proxies")

View file

@ -90,11 +90,12 @@ async def download_and_extract_content(
if error:
return None, metadata, error
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=temp_file_path, filename=file_name)
result = await extract_with_cache(
EtlRequest(file_path=temp_file_path, filename=file_name),
vision_llm=vision_llm,
)
markdown = result.markdown_content
return markdown, metadata, None

View file

@ -122,12 +122,13 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(
file_path: str, filename: str, *, vision_llm=None
) -> str:
"""Parse a local file to markdown using the unified ETL pipeline."""
"""Parse a local file to markdown via the cache-aware ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -84,11 +84,12 @@ async def download_and_extract_content(
async def _parse_file_to_markdown(
file_path: str, filename: str, *, vision_llm=None
) -> str:
"""Parse a local file to markdown using the unified ETL pipeline."""
"""Parse a local file to markdown via the cache-aware ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -1484,7 +1484,10 @@ class Document(BaseModel, TimestampMixin):
created_by = relationship("User", back_populates="documents")
connector = relationship("SearchSourceConnector", back_populates="documents")
chunks = relationship(
"Chunk", back_populates="document", cascade="all, delete-orphan"
"Chunk",
back_populates="document",
cascade="all, delete-orphan",
order_by="Chunk.position",
)
# Original upload + future derived artifacts (redacted, filled-form).
# Model lives in app.file_storage.persistence to keep that feature cohesive.
@ -1520,6 +1523,9 @@ class Chunk(BaseModel, TimestampMixin):
content = Column(Text, nullable=False)
embedding = Column(Vector(config.embedding_model_instance.dimension))
# Explicit document order; ids don't follow it since incremental
# re-indexing keeps unchanged rows across edits.
position = Column(Integer, nullable=False, server_default="0", index=True)
document_id = Column(
Integer,
@ -2864,7 +2870,11 @@ from app.automations.persistence import ( # noqa: E402, F401
AutomationRun,
AutomationTrigger,
)
from app.etl_pipeline.cache.persistence.models import CachedParse # noqa: E402, F401
from app.file_storage.persistence import DocumentFile # noqa: E402, F401
from app.indexing_pipeline.cache.persistence.models import ( # noqa: E402, F401
CachedEmbeddingSet,
)
from app.notifications.persistence import Notification # noqa: E402, F401
from app.podcasts.persistence import ( # noqa: E402, F401
Podcast,

View file

@ -0,0 +1,11 @@
"""Content-addressed reuse of expensive ETL parser output across workspaces."""
from __future__ import annotations
from app.etl_pipeline.cache.cached_extraction import extract_with_cache
from app.etl_pipeline.cache.service import EtlCacheService
__all__ = [
"EtlCacheService",
"extract_with_cache",
]

View file

@ -0,0 +1,88 @@
"""Entry point: serve ETL parses from cache, parsing only on a miss."""
from __future__ import annotations
import asyncio
import hashlib
import logging
from app.config import config
from app.etl_pipeline.cache.eligibility import is_parse_cacheable
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.service import EtlCacheService
from app.etl_pipeline.cache.settings import load_etl_cache_settings
from app.etl_pipeline.etl_document import EtlRequest, EtlResult
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.observability import metrics
logger = logging.getLogger(__name__)
_HASH_CHUNK = 1024 * 1024
async def extract_with_cache(
request: EtlRequest, *, vision_llm=None
) -> EtlResult:
"""Drop-in for ``EtlPipelineService.extract`` that reuses prior parser output."""
settings = load_etl_cache_settings()
cacheable = is_parse_cacheable(
filename=request.filename,
etl_service=config.ETL_SERVICE,
cache_enabled=settings.enabled,
has_vision_llm=vision_llm is not None,
)
if not cacheable:
return await EtlPipelineService(vision_llm=vision_llm).extract(request)
key = ParseKey.for_document(
await asyncio.to_thread(_hash_file, request.file_path),
etl_service=config.ETL_SERVICE,
mode=request.processing_mode.value,
version=settings.parser_version,
)
cached_result = await _recall(key)
if cached_result is not None:
metrics.record_etl_cache_lookup(
etl_service=key.etl_service, mode=key.mode, outcome="hit"
)
logger.debug("ETL cache hit for %s", key.source_sha256)
return cached_result
metrics.record_etl_cache_lookup(
etl_service=key.etl_service, mode=key.mode, outcome="miss"
)
result = await EtlPipelineService(vision_llm=vision_llm).extract(request)
await _remember(key, result)
return result
async def _recall(key: ParseKey) -> EtlResult | None:
# Caching is best-effort: any failure falls through to a normal parse.
try:
from app.tasks.celery_tasks import get_celery_session_maker
async with get_celery_session_maker()() as session:
return await EtlCacheService(session).recall(key)
except Exception:
logger.warning("ETL cache recall failed; parsing fresh", exc_info=True)
return None
async def _remember(key: ParseKey, result: EtlResult) -> None:
try:
from app.tasks.celery_tasks import get_celery_session_maker
async with get_celery_session_maker()() as session:
await EtlCacheService(session).remember(key, result)
except Exception:
logger.warning("ETL cache write failed; result not cached", exc_info=True)
def _hash_file(path: str) -> str:
digest = hashlib.sha256()
with open(path, "rb") as handle:
for chunk in iter(lambda: handle.read(_HASH_CHUNK), b""):
digest.update(chunk)
return digest.hexdigest()

View file

@ -0,0 +1,28 @@
"""Gating rule: may this upload be served from / written to the parse cache?"""
from __future__ import annotations
from app.etl_pipeline.file_classifier import FileCategory, classify_file
def is_parse_cacheable(
*,
filename: str,
etl_service: str | None,
cache_enabled: bool,
has_vision_llm: bool,
) -> bool:
"""Only deterministic document parses are shareable across workspaces.
Vision-LLM runs append model-generated content not captured by the cache key,
and a missing ETL service means there is no document parser to key against --
both bypass the cache. Non-document categories (plaintext, audio, images,
direct-convert) are cheap or parser-agnostic and are handled outside it.
"""
if not cache_enabled:
return False
if has_vision_llm:
return False
if not etl_service:
return False
return classify_file(filename) == FileCategory.DOCUMENT

View file

@ -0,0 +1,9 @@
"""Background pruning of the parse cache by age and size budget."""
from __future__ import annotations
from .task import evict_etl_cache_task
__all__ = [
"evict_etl_cache_task",
]

View file

@ -0,0 +1,28 @@
"""Pure selection rules for which cached entries to drop."""
from __future__ import annotations
from collections.abc import Iterable
from app.etl_pipeline.cache.schemas import EvictionCandidate
def select_over_budget(
coldest_first: Iterable[EvictionCandidate],
*,
current_total_bytes: int,
max_total_bytes: int,
) -> list[EvictionCandidate]:
"""Pick coldest entries until the footprint drops under the budget."""
bytes_to_free = current_total_bytes - max_total_bytes
if bytes_to_free <= 0:
return []
chosen: list[EvictionCandidate] = []
bytes_freed = 0
for candidate in coldest_first:
if bytes_freed >= bytes_to_free:
break
chosen.append(candidate)
bytes_freed += candidate.size_bytes
return chosen

View file

@ -0,0 +1,66 @@
"""Celery task that prunes the parse cache by TTL, then by size budget."""
from __future__ import annotations
import contextlib
import logging
from datetime import UTC, datetime, timedelta
from app.celery_app import celery_app
from app.etl_pipeline.cache.eviction.policy import select_over_budget
from app.etl_pipeline.cache.persistence import CachedParseRepository
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.etl_pipeline.cache.settings import load_etl_cache_settings
from app.etl_pipeline.cache.storage import MarkdownCacheStore
from app.observability import metrics
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
logger = logging.getLogger(__name__)
@celery_app.task(name="evict_etl_cache")
def evict_etl_cache_task():
return run_async_celery_task(_evict)
async def _evict() -> None:
"""Expire stale entries, then shed the coldest overflow only if still over budget."""
settings = load_etl_cache_settings()
if not settings.enabled:
return
store = MarkdownCacheStore()
async with get_celery_session_maker()() as session:
index = CachedParseRepository(session)
cutoff = datetime.now(UTC) - timedelta(days=settings.ttl_days)
expired = await index.select_expired(cutoff=cutoff, limit=settings.eviction_batch)
await _drop(index, store, expired, phase="ttl")
total = await index.total_size_bytes()
if total > settings.max_total_bytes:
coldest = await index.select_coldest(limit=settings.eviction_batch)
over_budget = select_over_budget(
coldest,
current_total_bytes=total,
max_total_bytes=settings.max_total_bytes,
)
await _drop(index, store, over_budget, phase="size")
async def _drop(
index: CachedParseRepository,
store: MarkdownCacheStore,
candidates: list[EvictionCandidate],
*,
phase: str,
) -> None:
if not candidates:
return
for candidate in candidates:
# Drop the index row even if the blob delete fails (orphan blob is harmless).
with contextlib.suppress(Exception):
await store.delete(candidate.storage_key)
await index.delete_by_ids([candidate.id for candidate in candidates])
metrics.record_etl_cache_eviction(len(candidates), phase=phase)
logger.info("Evicted %d cached parses (%s)", len(candidates), phase)

View file

@ -0,0 +1,11 @@
"""Database access for cached parse rows."""
from __future__ import annotations
from .models import CachedParse
from .repository import CachedParseRepository
__all__ = [
"CachedParse",
"CachedParseRepository",
]

View file

@ -0,0 +1,49 @@
"""``etl_cache_parses``: one reusable parser result per (bytes + recipe)."""
from __future__ import annotations
from sqlalchemy import (
BigInteger,
Column,
DateTime,
Index,
Integer,
String,
UniqueConstraint,
)
from app.db import BaseModel, TimestampMixin
class CachedParse(BaseModel, TimestampMixin):
__tablename__ = "etl_cache_parses"
# Key: raw bytes + the recipe that produced the markdown.
source_sha256 = Column(String(64), nullable=False)
etl_service = Column(String(32), nullable=False)
mode = Column(String(16), nullable=False)
parser_version = Column(Integer, nullable=False)
# Where the markdown blob lives (kept out of the row to stay small).
storage_backend = Column(String(32), nullable=False)
storage_key = Column(String, nullable=False)
size_bytes = Column(BigInteger, nullable=False)
# Payload needed to rebuild the EtlResult on a hit.
content_type = Column(String(32), nullable=False)
actual_pages = Column(Integer, nullable=False, default=0, server_default="0")
# Drives eviction (popularity + recency).
times_reused = Column(BigInteger, nullable=False, default=0, server_default="0")
last_used_at = Column(DateTime(timezone=True), nullable=False)
__table_args__ = (
UniqueConstraint(
"source_sha256",
"etl_service",
"mode",
"parser_version",
name="uq_etl_cache_parses_key",
),
Index("ix_etl_cache_parses_last_used_at", "last_used_at"),
)

View file

@ -0,0 +1,121 @@
"""CRUD and eviction selectors for ``etl_cache_parses`` (no business rules)."""
from __future__ import annotations
from datetime import UTC, datetime
from sqlalchemy import delete, func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.etl_pipeline.cache.schemas import EvictionCandidate, ParseKey
from .models import CachedParse
_EVICTION_COLUMNS = (
CachedParse.id,
CachedParse.storage_key,
CachedParse.size_bytes,
CachedParse.last_used_at,
CachedParse.times_reused,
)
def _as_eviction_candidate(row) -> EvictionCandidate:
return EvictionCandidate(
id=row.id,
storage_key=row.storage_key,
size_bytes=row.size_bytes,
last_used_at=row.last_used_at,
times_reused=row.times_reused,
)
class CachedParseRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get(self, key: ParseKey) -> CachedParse | None:
result = await self._session.execute(
select(CachedParse).where(
CachedParse.source_sha256 == key.source_sha256,
CachedParse.etl_service == key.etl_service,
CachedParse.mode == key.mode,
CachedParse.parser_version == key.version,
)
)
return result.scalars().first()
async def insert(
self,
*,
key: ParseKey,
content_type: str,
actual_pages: int,
storage_backend: str,
storage_key: str,
size_bytes: int,
) -> None:
# Concurrent writers parse identical bytes, so a lost race is harmless.
now = datetime.now(UTC)
await self._session.execute(
pg_insert(CachedParse)
.values(
source_sha256=key.source_sha256,
etl_service=key.etl_service,
mode=key.mode,
parser_version=key.version,
content_type=content_type,
actual_pages=actual_pages,
storage_backend=storage_backend,
storage_key=storage_key,
size_bytes=size_bytes,
times_reused=0,
last_used_at=now,
created_at=now,
)
.on_conflict_do_nothing(constraint="uq_etl_cache_parses_key")
)
await self._session.commit()
async def mark_used(self, row_id: int) -> None:
await self._session.execute(
update(CachedParse)
.where(CachedParse.id == row_id)
.values(
times_reused=CachedParse.times_reused + 1,
last_used_at=datetime.now(UTC),
)
)
await self._session.commit()
async def total_size_bytes(self) -> int:
result = await self._session.execute(
select(func.coalesce(func.sum(CachedParse.size_bytes), 0))
)
return int(result.scalar() or 0)
async def select_expired(
self, *, cutoff: datetime, limit: int
) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.where(CachedParse.last_used_at < cutoff)
.order_by(CachedParse.last_used_at.asc())
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def select_coldest(self, *, limit: int) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.order_by(CachedParse.times_reused.asc(), CachedParse.last_used_at.asc())
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def delete_by_ids(self, ids: list[int]) -> None:
if not ids:
return
await self._session.execute(delete(CachedParse).where(CachedParse.id.in_(ids)))
await self._session.commit()

View file

@ -0,0 +1,11 @@
"""Pure value objects for the parse cache."""
from __future__ import annotations
from .eviction_candidate import EvictionCandidate
from .parse_key import ParseKey
__all__ = [
"EvictionCandidate",
"ParseKey",
]

View file

@ -0,0 +1,15 @@
"""Row projection handed to the eviction policy."""
from __future__ import annotations
from dataclasses import dataclass
from datetime import datetime
@dataclass(frozen=True, slots=True)
class EvictionCandidate:
id: int
storage_key: str
size_bytes: int
last_used_at: datetime
times_reused: int

View file

@ -0,0 +1,28 @@
"""Identity of a cacheable parse: equal keys yield identical markdown."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ParseKey:
source_sha256: str
etl_service: str
mode: str
version: int
@classmethod
def for_document(
cls, source_sha256: str, *, etl_service: str, mode: str, version: int
) -> ParseKey:
return cls(
source_sha256=source_sha256,
etl_service=etl_service,
mode=mode,
version=version,
)
@property
def object_suffix(self) -> str:
return f"{self.etl_service}.{self.mode}.v{self.version}.md"

View file

@ -0,0 +1,53 @@
"""Recall and remember parser output, coordinating the index and blob store."""
from __future__ import annotations
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from app.etl_pipeline.cache.persistence import CachedParseRepository
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.storage import MarkdownCacheStore
from app.etl_pipeline.etl_document import EtlResult
logger = logging.getLogger(__name__)
class EtlCacheService:
def __init__(self, session: AsyncSession) -> None:
self._index = CachedParseRepository(session)
self._store = MarkdownCacheStore()
async def recall(self, key: ParseKey) -> EtlResult | None:
"""Return the cached result, or None on a miss."""
row = await self._index.get(key)
if row is None:
return None
try:
markdown = await self._store.load(row.storage_key)
except Exception:
# Index points at a blob that is gone; treat as a miss and re-parse.
logger.warning("Cache blob missing: %s", row.storage_key, exc_info=True)
return None
await self._index.mark_used(row.id)
return EtlResult(
markdown_content=markdown,
etl_service=row.etl_service,
actual_pages=row.actual_pages,
content_type=row.content_type,
)
async def remember(self, key: ParseKey, result: EtlResult) -> None:
"""Store a freshly parsed result for future reuse."""
storage_key = await self._store.save(key, result.markdown_content)
await self._index.insert(
key=key,
content_type=result.content_type,
actual_pages=result.actual_pages,
storage_backend=self._store.backend_name,
storage_key=storage_key,
size_bytes=len(result.markdown_content.encode("utf-8")),
)

View file

@ -0,0 +1,33 @@
"""Cache configuration resolved from the central ``Config``."""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class EtlCacheSettings:
enabled: bool
parser_version: int
ttl_days: int
max_total_bytes: int
eviction_batch: int
# None for any storage_* field means: reuse the main file_storage backend.
storage_backend: str | None
storage_container: str | None
storage_local_root: str | None
def load_etl_cache_settings() -> EtlCacheSettings:
from app.config import config
return EtlCacheSettings(
enabled=config.ETL_CACHE_ENABLED,
parser_version=config.ETL_CACHE_PARSER_VERSION,
ttl_days=config.ETL_CACHE_TTL_DAYS,
max_total_bytes=config.ETL_CACHE_MAX_TOTAL_MB * 1024 * 1024,
eviction_batch=config.ETL_CACHE_EVICTION_BATCH,
storage_backend=config.ETL_CACHE_STORAGE_BACKEND or None,
storage_container=config.ETL_CACHE_STORAGE_CONTAINER or None,
storage_local_root=config.ETL_CACHE_STORAGE_LOCAL_PATH or None,
)

View file

@ -0,0 +1,9 @@
"""Blob storage for cached parse markdown."""
from __future__ import annotations
from .markdown_store import MarkdownCacheStore
__all__ = [
"MarkdownCacheStore",
]

View file

@ -0,0 +1,46 @@
"""Resolve the storage backend for cache blobs: shared main store or a dedicated one."""
from __future__ import annotations
from functools import lru_cache
from app.file_storage.backends.base import StorageBackend
@lru_cache(maxsize=1)
def resolve_cache_backend() -> StorageBackend:
from app.etl_pipeline.cache.settings import load_etl_cache_settings
settings = load_etl_cache_settings()
if not settings.storage_backend:
from app.file_storage.factory import get_storage_backend
return get_storage_backend()
backend = settings.storage_backend.strip().lower()
if backend == "azure":
from app.config import config
if not settings.storage_container:
raise ValueError("ETL_CACHE_STORAGE_CONTAINER is required for azure cache.")
if not config.AZURE_STORAGE_CONNECTION_STRING:
raise ValueError(
"AZURE_STORAGE_CONNECTION_STRING is required for azure cache."
)
from app.file_storage.backends.azure import AzureBlobBackend
return AzureBlobBackend(
connection_string=config.AZURE_STORAGE_CONNECTION_STRING,
container=settings.storage_container,
)
if backend == "local":
if not settings.storage_local_root:
raise ValueError("ETL_CACHE_STORAGE_LOCAL_PATH is required for local cache.")
from app.file_storage.backends.local import LocalFileBackend
return LocalFileBackend(settings.storage_local_root)
raise ValueError(f"Unknown ETL_CACHE_STORAGE_BACKEND: {settings.storage_backend!r}")

View file

@ -0,0 +1,35 @@
"""Read and write cached markdown blobs through the resolved backend."""
from __future__ import annotations
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
from app.etl_pipeline.cache.storage.object_keys import build_parse_object_key
_MARKDOWN_CONTENT_TYPE = "text/markdown; charset=utf-8"
class MarkdownCacheStore:
def __init__(self) -> None:
self._backend = resolve_cache_backend()
@property
def backend_name(self) -> str:
return self._backend.backend_name
async def save(self, key: ParseKey, markdown: str) -> str:
"""Persist the markdown and return its storage key for the index row."""
storage_key = build_parse_object_key(key)
await self._backend.put(
storage_key,
markdown.encode("utf-8"),
content_type=_MARKDOWN_CONTENT_TYPE,
)
return storage_key
async def load(self, storage_key: str) -> str:
chunks = [chunk async for chunk in self._backend.open_stream(storage_key)]
return b"".join(chunks).decode("utf-8")
async def delete(self, storage_key: str) -> None:
await self._backend.delete(storage_key)

View file

@ -0,0 +1,12 @@
"""Object keys for cached markdown, namespaced under a dedicated prefix."""
from __future__ import annotations
from app.etl_pipeline.cache.schemas import ParseKey
CACHE_PREFIX = "etl_cache"
def build_parse_object_key(key: ParseKey) -> str:
# Content-addressed: identical bytes + recipe always map to the same key.
return f"{CACHE_PREFIX}/{key.source_sha256}/{key.object_suffix}"

View file

@ -0,0 +1,11 @@
"""Content-addressed reuse of chunk+embedding output across workspaces."""
from __future__ import annotations
from app.indexing_pipeline.cache.cached_indexing import build_chunk_embeddings
from app.indexing_pipeline.cache.service import EmbeddingCacheService
__all__ = [
"EmbeddingCacheService",
"build_chunk_embeddings",
]

View file

@ -0,0 +1,129 @@
"""Entry point: serve chunk embeddings from cache, embedding only on a miss.
Embeddings are a pure function of the markdown, the embedding model, and the
chunker -- so identical markdown is chunked and embedded once and reused across
workspaces, even when it came from different sources.
"""
from __future__ import annotations
import asyncio
import hashlib
import logging
import numpy as np
from app.config import config
from app.indexing_pipeline.cache.eligibility import is_embedding_cacheable
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.service import EmbeddingCacheService
from app.indexing_pipeline.cache.settings import load_embedding_cache_settings
from app.indexing_pipeline.document_chunker import chunk_text, chunk_text_hybrid
from app.indexing_pipeline.document_embedder import embed_texts
from app.observability import metrics
logger = logging.getLogger(__name__)
ChunkPair = tuple[str, np.ndarray]
async def build_chunk_embeddings(
markdown: str, *, use_code_chunker: bool
) -> tuple[np.ndarray, list[ChunkPair]]:
"""Return the document-level vector and ordered ``(chunk_text, vector)`` pairs.
Drop-in for the inline chunk+embed step; reuses prior output when the same
markdown has already been embedded with the current model and chunker.
"""
settings = load_embedding_cache_settings()
chunker_kind = "code" if use_code_chunker else "hybrid"
embedding_dim = getattr(config.embedding_model_instance, "dimension", None)
cacheable = is_embedding_cacheable(
cache_enabled=settings.enabled,
embedding_model=config.EMBEDDING_MODEL,
embedding_dim=embedding_dim,
)
if not cacheable:
return await _compute(markdown, use_code_chunker=use_code_chunker)
key = EmbeddingKey(
markdown_sha256=_hash_text(markdown),
embedding_model=config.EMBEDDING_MODEL,
embedding_dim=int(embedding_dim),
chunker_kind=chunker_kind,
chunker_version=settings.chunker_version,
)
cached = await _recall(key)
if cached is not None:
metrics.record_embedding_cache_lookup(
embedding_model=key.embedding_model,
chunker_kind=chunker_kind,
outcome="hit",
)
logger.debug("Embedding cache hit for %s", key.markdown_sha256)
return cached.summary_embedding, [(c.text, c.embedding) for c in cached.chunks]
metrics.record_embedding_cache_lookup(
embedding_model=key.embedding_model, chunker_kind=chunker_kind, outcome="miss"
)
summary_embedding, chunk_pairs = await _compute(
markdown, use_code_chunker=use_code_chunker
)
await _remember(key, summary_embedding, chunk_pairs)
return summary_embedding, chunk_pairs
async def chunk_markdown(markdown: str, *, use_code_chunker: bool) -> list[str]:
"""Chunk markdown into ordered texts with the pipeline's chunker selection."""
if use_code_chunker:
return await asyncio.to_thread(chunk_text, markdown, use_code_chunker=True)
# Table-aware hybrid chunker keeps Markdown tables intact (issue #1334).
return await asyncio.to_thread(chunk_text_hybrid, markdown)
async def embed_batch(texts: list[str]) -> list[np.ndarray]:
"""Embed texts in one batch off the event loop."""
return await asyncio.to_thread(embed_texts, texts)
async def _compute(
markdown: str, *, use_code_chunker: bool
) -> tuple[np.ndarray, list[ChunkPair]]:
chunk_texts = await chunk_markdown(markdown, use_code_chunker=use_code_chunker)
embeddings = await embed_batch([markdown, *chunk_texts])
summary_embedding, *chunk_embeddings = embeddings
return summary_embedding, list(zip(chunk_texts, chunk_embeddings, strict=False))
async def _recall(key: EmbeddingKey) -> EmbeddingSet | None:
# Caching is best-effort: any failure falls through to a normal embed.
try:
from app.tasks.celery_tasks import get_celery_session_maker
async with get_celery_session_maker()() as session:
return await EmbeddingCacheService(session).recall(key)
except Exception:
logger.warning("Embedding cache recall failed; embedding fresh", exc_info=True)
return None
async def _remember(
key: EmbeddingKey, summary_embedding: np.ndarray, chunk_pairs: list[ChunkPair]
) -> None:
try:
from app.tasks.celery_tasks import get_celery_session_maker
embedding_set = EmbeddingSet(
summary_embedding=summary_embedding,
chunks=[CachedChunk(text=text, embedding=vec) for text, vec in chunk_pairs],
)
async with get_celery_session_maker()() as session:
await EmbeddingCacheService(session).remember(key, embedding_set)
except Exception:
logger.warning("Embedding cache write failed; result not cached", exc_info=True)
def _hash_text(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()

View file

@ -0,0 +1,21 @@
"""Gating rule: may this document be served from / written to the embedding cache?"""
from __future__ import annotations
def is_embedding_cacheable(
*,
cache_enabled: bool,
embedding_model: str | None,
embedding_dim: int | None,
) -> bool:
"""Cache only when a concrete embedding model and dimension are configured.
Without a model there is nothing to key against, and without a dimension the
blob's integrity guard cannot run -- both bypass the cache.
"""
if not cache_enabled:
return False
if not embedding_model:
return False
return bool(embedding_dim)

View file

@ -0,0 +1,9 @@
"""Background pruning of the embedding cache by age and size budget."""
from __future__ import annotations
from .task import evict_embedding_cache_task
__all__ = [
"evict_embedding_cache_task",
]

View file

@ -0,0 +1,68 @@
"""Celery task that prunes the embedding cache by TTL, then by size budget."""
from __future__ import annotations
import contextlib
import logging
from datetime import UTC, datetime, timedelta
from app.celery_app import celery_app
from app.etl_pipeline.cache.eviction.policy import select_over_budget
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository
from app.indexing_pipeline.cache.settings import load_embedding_cache_settings
from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
from app.observability import metrics
from app.tasks.celery_tasks import get_celery_session_maker, run_async_celery_task
logger = logging.getLogger(__name__)
@celery_app.task(name="evict_embedding_cache")
def evict_embedding_cache_task():
return run_async_celery_task(_evict)
async def _evict() -> None:
"""Expire stale entries, then shed the coldest overflow only if still over budget."""
settings = load_embedding_cache_settings()
if not settings.enabled:
return
store = EmbeddingCacheStore()
async with get_celery_session_maker()() as session:
index = CachedEmbeddingSetRepository(session)
cutoff = datetime.now(UTC) - timedelta(days=settings.ttl_days)
expired = await index.select_expired(
cutoff=cutoff, limit=settings.eviction_batch
)
await _drop(index, store, expired, phase="ttl")
total = await index.total_size_bytes()
if total > settings.max_total_bytes:
coldest = await index.select_coldest(limit=settings.eviction_batch)
over_budget = select_over_budget(
coldest,
current_total_bytes=total,
max_total_bytes=settings.max_total_bytes,
)
await _drop(index, store, over_budget, phase="size")
async def _drop(
index: CachedEmbeddingSetRepository,
store: EmbeddingCacheStore,
candidates: list[EvictionCandidate],
*,
phase: str,
) -> None:
if not candidates:
return
for candidate in candidates:
# Drop the index row even if the blob delete fails (orphan blob is harmless).
with contextlib.suppress(Exception):
await store.delete(candidate.storage_key)
await index.delete_by_ids([candidate.id for candidate in candidates])
metrics.record_embedding_cache_eviction(len(candidates), phase=phase)
logger.info("Evicted %d cached embedding sets (%s)", len(candidates), phase)

View file

@ -0,0 +1,11 @@
"""Database access for cached embedding sets."""
from __future__ import annotations
from .models import CachedEmbeddingSet
from .repository import CachedEmbeddingSetRepository
__all__ = [
"CachedEmbeddingSet",
"CachedEmbeddingSetRepository",
]

View file

@ -0,0 +1,47 @@
"""``embedding_cache_sets``: one reusable chunk+embedding set per markdown."""
from __future__ import annotations
from sqlalchemy import (
BigInteger,
Column,
DateTime,
Index,
Integer,
String,
UniqueConstraint,
)
from app.db import BaseModel, TimestampMixin
class CachedEmbeddingSet(BaseModel, TimestampMixin):
__tablename__ = "embedding_cache_sets"
# Key: markdown text + the recipe that turned it into vectors.
markdown_sha256 = Column(String(64), nullable=False)
embedding_model = Column(String(255), nullable=False)
embedding_dim = Column(Integer, nullable=False)
chunker_kind = Column(String(8), nullable=False)
chunker_version = Column(Integer, nullable=False)
# Where the embedding blob lives (kept out of the row to stay small).
storage_backend = Column(String(32), nullable=False)
storage_key = Column(String, nullable=False)
size_bytes = Column(BigInteger, nullable=False)
chunk_count = Column(Integer, nullable=False, default=0, server_default="0")
# Drives eviction (popularity + recency).
times_reused = Column(BigInteger, nullable=False, default=0, server_default="0")
last_used_at = Column(DateTime(timezone=True), nullable=False)
__table_args__ = (
UniqueConstraint(
"markdown_sha256",
"embedding_model",
"chunker_kind",
"chunker_version",
name="uq_embedding_cache_sets_key",
),
Index("ix_embedding_cache_sets_last_used_at", "last_used_at"),
)

View file

@ -0,0 +1,126 @@
"""CRUD and eviction selectors for ``embedding_cache_sets`` (no business rules)."""
from __future__ import annotations
from datetime import UTC, datetime
from sqlalchemy import delete, func, select, update
from sqlalchemy.dialects.postgresql import insert as pg_insert
from sqlalchemy.ext.asyncio import AsyncSession
from app.etl_pipeline.cache.schemas import EvictionCandidate
from app.indexing_pipeline.cache.schemas import EmbeddingKey
from .models import CachedEmbeddingSet
_EVICTION_COLUMNS = (
CachedEmbeddingSet.id,
CachedEmbeddingSet.storage_key,
CachedEmbeddingSet.size_bytes,
CachedEmbeddingSet.last_used_at,
CachedEmbeddingSet.times_reused,
)
def _as_eviction_candidate(row) -> EvictionCandidate:
return EvictionCandidate(
id=row.id,
storage_key=row.storage_key,
size_bytes=row.size_bytes,
last_used_at=row.last_used_at,
times_reused=row.times_reused,
)
class CachedEmbeddingSetRepository:
def __init__(self, session: AsyncSession) -> None:
self._session = session
async def get(self, key: EmbeddingKey) -> CachedEmbeddingSet | None:
result = await self._session.execute(
select(CachedEmbeddingSet).where(
CachedEmbeddingSet.markdown_sha256 == key.markdown_sha256,
CachedEmbeddingSet.embedding_model == key.embedding_model,
CachedEmbeddingSet.chunker_kind == key.chunker_kind,
CachedEmbeddingSet.chunker_version == key.chunker_version,
)
)
return result.scalars().first()
async def insert(
self,
*,
key: EmbeddingKey,
storage_backend: str,
storage_key: str,
size_bytes: int,
chunk_count: int,
) -> None:
# Concurrent writers embed identical markdown, so a lost race is harmless.
now = datetime.now(UTC)
await self._session.execute(
pg_insert(CachedEmbeddingSet)
.values(
markdown_sha256=key.markdown_sha256,
embedding_model=key.embedding_model,
embedding_dim=key.embedding_dim,
chunker_kind=key.chunker_kind,
chunker_version=key.chunker_version,
storage_backend=storage_backend,
storage_key=storage_key,
size_bytes=size_bytes,
chunk_count=chunk_count,
times_reused=0,
last_used_at=now,
created_at=now,
)
.on_conflict_do_nothing(constraint="uq_embedding_cache_sets_key")
)
await self._session.commit()
async def mark_used(self, row_id: int) -> None:
await self._session.execute(
update(CachedEmbeddingSet)
.where(CachedEmbeddingSet.id == row_id)
.values(
times_reused=CachedEmbeddingSet.times_reused + 1,
last_used_at=datetime.now(UTC),
)
)
await self._session.commit()
async def total_size_bytes(self) -> int:
result = await self._session.execute(
select(func.coalesce(func.sum(CachedEmbeddingSet.size_bytes), 0))
)
return int(result.scalar() or 0)
async def select_expired(
self, *, cutoff: datetime, limit: int
) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.where(CachedEmbeddingSet.last_used_at < cutoff)
.order_by(CachedEmbeddingSet.last_used_at.asc())
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def select_coldest(self, *, limit: int) -> list[EvictionCandidate]:
result = await self._session.execute(
select(*_EVICTION_COLUMNS)
.order_by(
CachedEmbeddingSet.times_reused.asc(),
CachedEmbeddingSet.last_used_at.asc(),
)
.limit(limit)
)
return [_as_eviction_candidate(row) for row in result]
async def delete_by_ids(self, ids: list[int]) -> None:
if not ids:
return
await self._session.execute(
delete(CachedEmbeddingSet).where(CachedEmbeddingSet.id.in_(ids))
)
await self._session.commit()

View file

@ -0,0 +1,12 @@
"""Pure value objects for the embedding cache."""
from __future__ import annotations
from .embedding_key import EmbeddingKey
from .embedding_set import CachedChunk, EmbeddingSet
__all__ = [
"CachedChunk",
"EmbeddingKey",
"EmbeddingSet",
]

View file

@ -0,0 +1,27 @@
"""Identity of a cacheable embedding set: equal keys yield identical vectors.
Embeddings depend on the markdown text, the embedding model, and the chunker --
never on how the markdown was produced. So the key is the markdown's own hash
plus the model and chunker recipe, not the upstream parse identity.
"""
from __future__ import annotations
import hashlib
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class EmbeddingKey:
markdown_sha256: str
embedding_model: str
embedding_dim: int
chunker_kind: str
chunker_version: int
@property
def object_suffix(self) -> str:
# Fingerprint the model so distinct models never share a blob, while the
# markdown hash (the object's folder) stays human-readable.
fingerprint = hashlib.sha256(self.embedding_model.encode("utf-8")).hexdigest()
return f"{fingerprint[:16]}.{self.chunker_kind}.v{self.chunker_version}.emb"

View file

@ -0,0 +1,29 @@
"""The cached payload: a document's chunk texts paired with their vectors."""
from __future__ import annotations
from dataclasses import dataclass
import numpy as np
@dataclass(frozen=True, slots=True)
class CachedChunk:
text: str
embedding: np.ndarray
@dataclass(frozen=True, slots=True)
class EmbeddingSet:
"""Everything the indexer needs to rebuild a document's chunks without embedding.
``summary_embedding`` is the document-level vector; ``chunks`` are the ordered
chunk texts and their vectors.
"""
summary_embedding: np.ndarray
chunks: list[CachedChunk]
@property
def chunk_count(self) -> int:
return len(self.chunks)

View file

@ -0,0 +1,71 @@
"""Serialize an EmbeddingSet to a compact, self-describing blob (no pickle).
Layout: ``MAGIC | uint32 header_len | json header | float32 matrix``. The header
carries the dim, chunk count, and ordered chunk texts; the matrix holds the
summary vector followed by one row per chunk, all float32 for compactness.
"""
from __future__ import annotations
import json
import struct
import numpy as np
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingSet
# Marker at the start of every blob: "SurfSense EMBeddings, version 1"-> SSEMB1. Lets us
# reject foreign blobs and bump the trailing digit if the layout ever changes.
_MAGIC = b"SSEMB1"
# 4-byte big-endian unsigned int written before the variable-length JSON header,
# so the reader knows where the header ends and the float matrix begins.
_HEADER_LEN = struct.Struct(">I")
def serialize(embedding_set: EmbeddingSet) -> bytes:
summary = np.asarray(embedding_set.summary_embedding, dtype=np.float32).reshape(-1)
dim = int(summary.shape[0])
rows = [summary]
texts: list[str] = []
for chunk in embedding_set.chunks:
vector = np.asarray(chunk.embedding, dtype=np.float32).reshape(-1)
if vector.shape[0] != dim:
raise ValueError("All vectors in an embedding set must share one dimension.")
rows.append(vector)
texts.append(chunk.text)
matrix = np.stack(rows, axis=0)
header = json.dumps(
{"dim": dim, "count": len(texts), "texts": texts}, ensure_ascii=False
).encode("utf-8")
return b"".join(
[_MAGIC, _HEADER_LEN.pack(len(header)), header, matrix.tobytes(order="C")]
)
def deserialize(blob: bytes) -> EmbeddingSet:
view = memoryview(blob)
if bytes(view[: len(_MAGIC)]) != _MAGIC:
raise ValueError("Unrecognized embedding cache blob.")
offset = len(_MAGIC)
(header_len,) = _HEADER_LEN.unpack(view[offset : offset + _HEADER_LEN.size])
offset += _HEADER_LEN.size
header = json.loads(bytes(view[offset : offset + header_len]).decode("utf-8"))
offset += header_len
dim = int(header["dim"])
count = int(header["count"])
texts: list[str] = header["texts"]
matrix = np.frombuffer(view[offset:], dtype=np.float32)
if matrix.shape[0] != (count + 1) * dim:
raise ValueError("Embedding cache blob is truncated or corrupt.")
matrix = matrix.reshape(count + 1, dim)
return EmbeddingSet(
summary_embedding=matrix[0],
chunks=[CachedChunk(text=texts[i], embedding=matrix[i + 1]) for i in range(count)],
)

View file

@ -0,0 +1,51 @@
"""Recall and remember embedding sets, coordinating the index and blob store."""
from __future__ import annotations
import logging
from sqlalchemy.ext.asyncio import AsyncSession
from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository
from app.indexing_pipeline.cache.schemas import EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
logger = logging.getLogger(__name__)
class EmbeddingCacheService:
def __init__(self, session: AsyncSession) -> None:
self._index = CachedEmbeddingSetRepository(session)
self._store = EmbeddingCacheStore()
async def recall(self, key: EmbeddingKey) -> EmbeddingSet | None:
"""Return the cached embedding set, or None on a miss."""
row = await self._index.get(key)
if row is None:
return None
try:
embedding_set = await self._store.load(row.storage_key)
except Exception:
# Index points at a blob that is gone; treat as a miss and re-embed.
logger.warning("Cache blob missing: %s", row.storage_key, exc_info=True)
return None
if int(embedding_set.summary_embedding.shape[0]) != key.embedding_dim:
# A model swapped its dimension under a reused name; never serve it.
logger.warning("Cached embedding dimension mismatch: %s", row.storage_key)
return None
await self._index.mark_used(row.id)
return embedding_set
async def remember(self, key: EmbeddingKey, embedding_set: EmbeddingSet) -> None:
"""Store a freshly embedded set for future reuse."""
storage_key, size_bytes = await self._store.save(key, embedding_set)
await self._index.insert(
key=key,
storage_backend=self._store.backend_name,
storage_key=storage_key,
size_bytes=size_bytes,
chunk_count=embedding_set.chunk_count,
)

View file

@ -0,0 +1,30 @@
"""Embedding-cache configuration resolved from the central ``Config``.
The blob backend is intentionally not configured here: it is shared with the ETL
parse cache (see ``ETL_CACHE_STORAGE_*``).
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass(frozen=True)
class EmbeddingCacheSettings:
enabled: bool
chunker_version: int
ttl_days: int
max_total_bytes: int
eviction_batch: int
def load_embedding_cache_settings() -> EmbeddingCacheSettings:
from app.config import config
return EmbeddingCacheSettings(
enabled=config.EMBEDDING_CACHE_ENABLED,
chunker_version=config.EMBEDDING_CACHE_CHUNKER_VERSION,
ttl_days=config.EMBEDDING_CACHE_TTL_DAYS,
max_total_bytes=config.EMBEDDING_CACHE_MAX_TOTAL_MB * 1024 * 1024,
eviction_batch=config.EMBEDDING_CACHE_EVICTION_BATCH,
)

View file

@ -0,0 +1,9 @@
"""Blob storage for cached embedding sets."""
from __future__ import annotations
from .embedding_store import EmbeddingCacheStore
__all__ = [
"EmbeddingCacheStore",
]

View file

@ -0,0 +1,39 @@
"""Read and write cached embedding blobs through the shared cache backend.
The blob backend is shared with the ETL parse cache (same bucket / root), so
markdown and its embeddings live side by side; only the object prefix differs.
"""
from __future__ import annotations
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
from app.indexing_pipeline.cache.schemas import EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.serialization import deserialize, serialize
from app.indexing_pipeline.cache.storage.object_keys import build_embedding_object_key
_EMBEDDING_CONTENT_TYPE = "application/octet-stream"
class EmbeddingCacheStore:
def __init__(self) -> None:
self._backend = resolve_cache_backend()
@property
def backend_name(self) -> str:
return self._backend.backend_name
async def save(self, key: EmbeddingKey, embedding_set: EmbeddingSet) -> tuple[str, int]:
"""Persist the embedding set and return its storage key and byte size."""
blob = serialize(embedding_set)
storage_key = build_embedding_object_key(key)
await self._backend.put(
storage_key, blob, content_type=_EMBEDDING_CONTENT_TYPE
)
return storage_key, len(blob)
async def load(self, storage_key: str) -> EmbeddingSet:
chunks = [chunk async for chunk in self._backend.open_stream(storage_key)]
return deserialize(b"".join(chunks))
async def delete(self, storage_key: str) -> None:
await self._backend.delete(storage_key)

View file

@ -0,0 +1,12 @@
"""Object keys for cached embedding sets, namespaced under a dedicated prefix."""
from __future__ import annotations
from app.indexing_pipeline.cache.schemas import EmbeddingKey
CACHE_PREFIX = "embedding_cache"
def build_embedding_object_key(key: EmbeddingKey) -> str:
# Content-addressed: identical markdown + recipe always map to the same key.
return f"{CACHE_PREFIX}/{key.markdown_sha256}/{key.object_suffix}"

View file

@ -0,0 +1,56 @@
"""Diff a document's existing chunk rows against its freshly chunked texts.
Embeddings are a pure function of chunk text, so a row whose content reappears
in the new chunking keeps its embedding (and its HNSW/GIN index entries); only
genuinely new texts are embedded and only vanished rows are deleted. Matching
is a greedy multiset match on content in document order, so duplicate
boilerplate chunks pair up one-to-one and reordered chunks become cheap
position updates instead of delete+reinsert.
"""
from __future__ import annotations
from collections import defaultdict, deque
from dataclasses import dataclass
@dataclass(frozen=True, slots=True)
class ExistingChunk:
id: int
content: str
position: int
@dataclass(frozen=True, slots=True)
class ChunkPlan:
"""The minimal set of writes that turns the stored chunks into the new ones.
``reused`` holds only kept rows whose position actually changed; rows that
match in place need no write at all. Kept-row count (for metrics) is
``len(existing) - len(to_delete)``.
"""
reused: list[tuple[int, int]] # (existing_chunk_id, new_position)
to_embed: list[tuple[int, str]] # (new_position, text)
to_delete: list[int] # existing chunk ids
def reconcile(existing: list[ExistingChunk], new_texts: list[str]) -> ChunkPlan:
available: dict[str, deque[ExistingChunk]] = defaultdict(deque)
for chunk in sorted(existing, key=lambda c: c.position):
available[chunk.content].append(chunk)
reused: list[tuple[int, int]] = []
to_embed: list[tuple[int, str]] = []
for new_position, text in enumerate(new_texts):
matches = available.get(text)
if matches:
chunk = matches.popleft()
if chunk.position != new_position:
reused.append((chunk.id, new_position))
else:
to_embed.append((new_position, text))
to_delete = [chunk.id for queue in available.values() for chunk in queue]
return ChunkPlan(reused=reused, to_embed=to_embed, to_delete=to_delete)

View file

@ -8,7 +8,7 @@ from collections.abc import Awaitable, Callable
from dataclasses import dataclass, field
from datetime import UTC, datetime
from sqlalchemy import delete, select
from sqlalchemy import delete, select, update
from sqlalchemy.exc import IntegrityError
from sqlalchemy.ext.asyncio import AsyncSession
@ -19,9 +19,10 @@ from app.db import (
DocumentStatus,
DocumentType,
)
from app.indexing_pipeline.cache import build_chunk_embeddings
from app.indexing_pipeline.cache.cached_indexing import chunk_markdown, embed_batch
from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile
from app.indexing_pipeline.connector_document import ConnectorDocument
from app.indexing_pipeline.document_chunker import chunk_text, chunk_text_hybrid
from app.indexing_pipeline.document_embedder import embed_texts
from app.indexing_pipeline.document_hashing import (
compute_content_hash,
compute_identifier_hash,
@ -380,53 +381,34 @@ class IndexingPipelineService:
content = connector_doc.source_markdown
await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
t_step = time.perf_counter()
if connector_doc.should_use_code_chunker:
chunk_texts = await asyncio.to_thread(
chunk_text,
connector_doc.source_markdown,
use_code_chunker=True,
existing = await self._load_existing_chunks(document.id)
if existing and self._reconcile_enabled():
chunk_count = await self._reindex_incrementally(
document, content, connector_doc, existing
)
else:
# Use the table-aware hybrid chunker so Markdown tables are not
# split mid-row (see issue #1334).
chunk_texts = await asyncio.to_thread(
chunk_text_hybrid,
connector_doc.source_markdown,
chunk_count = await self._reindex_from_scratch(
document, content, connector_doc
)
texts_to_embed = [content, *chunk_texts]
embeddings = await asyncio.to_thread(embed_texts, texts_to_embed)
summary_embedding, *chunk_embeddings = embeddings
chunks = [
Chunk(content=text, embedding=emb)
for text, emb in zip(chunk_texts, chunk_embeddings, strict=False)
]
perf.info(
"[indexing] chunk+embed doc=%d chunks=%d in %.3fs",
document.id,
len(chunks),
chunk_count,
time.perf_counter() - t_step,
)
document.content = content
document.embedding = summary_embedding
attach_chunks_to_document(document, chunks)
document.updated_at = datetime.now(UTC)
document.status = DocumentStatus.ready()
await self.session.commit()
perf.info(
"[indexing] index TOTAL doc=%d chunks=%d in %.3fs",
document.id,
len(chunks),
chunk_count,
time.perf_counter() - t_index,
)
log_index_success(ctx, chunk_count=len(chunks))
log_index_success(ctx, chunk_count=chunk_count)
outcome_status = "success"
await self._enqueue_ai_sort_if_enabled(document)
@ -483,6 +465,92 @@ class IndexingPipelineService:
persist_span_cm.__exit__(*sys.exc_info())
return document
@staticmethod
def _reconcile_enabled() -> bool:
from app.config import config
return config.CHUNK_RECONCILE_ENABLED
async def _load_existing_chunks(self, document_id: int) -> list[ExistingChunk]:
result = await self.session.execute(
select(Chunk.id, Chunk.content, Chunk.position).where(
Chunk.document_id == document_id
)
)
return [
ExistingChunk(id=row.id, content=row.content, position=row.position)
for row in result
]
async def _reindex_from_scratch(
self, document: Document, content: str, connector_doc: ConnectorDocument
) -> int:
"""First index (or kill-switched re-index): cache-aware full chunk+embed."""
await self.session.execute(
delete(Chunk).where(Chunk.document_id == document.id)
)
summary_embedding, chunk_pairs = await build_chunk_embeddings(
content,
use_code_chunker=connector_doc.should_use_code_chunker,
)
chunks = [
Chunk(content=text, embedding=emb, position=i)
for i, (text, emb) in enumerate(chunk_pairs)
]
document.embedding = summary_embedding
attach_chunks_to_document(document, chunks)
return len(chunks)
async def _reindex_incrementally(
self,
document: Document,
content: str,
connector_doc: ConnectorDocument,
existing: list[ExistingChunk],
) -> int:
"""Edit path: keep rows whose text survived, embed only new texts.
Unchanged rows keep their embedding and their HNSW/GIN index entries;
moved rows get a position-only UPDATE, which touches neither index.
"""
new_texts = await chunk_markdown(
content, use_code_chunker=connector_doc.should_use_code_chunker
)
plan = reconcile(existing, new_texts)
# One batch: the document-level summary vector plus the missing chunks.
embeddings = await embed_batch([content, *[t for _, t in plan.to_embed]])
summary_embedding, *new_embeddings = embeddings
if plan.reused:
await self.session.execute(
update(Chunk),
[{"id": cid, "position": pos} for cid, pos in plan.reused],
)
if plan.to_delete:
await self.session.execute(
delete(Chunk).where(Chunk.id.in_(plan.to_delete))
)
self.session.add_all(
Chunk(
content=text,
embedding=emb,
position=pos,
document_id=document.id,
)
for (pos, text), emb in zip(plan.to_embed, new_embeddings, strict=True)
)
document.embedding = summary_embedding
ot_metrics.record_chunk_reconcile(
reused=len(existing) - len(plan.to_delete),
embedded=len(plan.to_embed),
deleted=len(plan.to_delete),
)
return len(new_texts)
async def _enqueue_ai_sort_if_enabled(self, document: Document) -> None:
"""Fire-and-forget: enqueue incremental AI sort if the search space has it enabled."""
try:

View file

@ -289,6 +289,49 @@ def _etl_extract_outcome():
)
@lru_cache(maxsize=1)
def _etl_cache_lookups():
return _get_meter().create_counter(
"surfsense.etl.cache.lookups",
description="Count of ETL parse-cache lookups by outcome (hit/miss).",
)
@lru_cache(maxsize=1)
def _etl_cache_evictions():
return _get_meter().create_counter(
"surfsense.etl.cache.evictions",
description="Count of ETL parse-cache entries evicted, by phase.",
)
@lru_cache(maxsize=1)
def _embedding_cache_lookups():
return _get_meter().create_counter(
"surfsense.embedding.cache.lookups",
description="Count of embedding (chunk+embedding) cache lookups by outcome (hit/miss).",
)
@lru_cache(maxsize=1)
def _embedding_cache_evictions():
return _get_meter().create_counter(
"surfsense.embedding.cache.evictions",
description="Count of embedding cache entries evicted, by phase.",
)
@lru_cache(maxsize=1)
def _chunk_reconcile_chunks():
return _get_meter().create_counter(
"surfsense.indexing.reconcile.chunks",
description=(
"Chunks handled by incremental re-indexing, by outcome "
"(reused/embedded/deleted)."
),
)
@lru_cache(maxsize=1)
def _celery_heartbeat_refreshes():
return _get_meter().create_counter(
@ -670,6 +713,61 @@ def record_etl_extract_outcome(
)
def record_etl_cache_lookup(
*, etl_service: str | None, mode: str | None, outcome: str
) -> None:
"""Record a parse-cache lookup. ``outcome`` is ``hit`` or ``miss``."""
_add(
_etl_cache_lookups(),
1,
{
"etl.service": etl_service or "unknown",
"mode": mode or "unknown",
"outcome": outcome,
},
)
def record_etl_cache_eviction(count: int, *, phase: str) -> None:
"""Record evicted entries. ``phase`` is ``ttl`` or ``size``."""
if count <= 0:
return
_add(_etl_cache_evictions(), count, {"phase": phase})
def record_embedding_cache_lookup(
*, embedding_model: str | None, chunker_kind: str | None, outcome: str
) -> None:
"""Record an embedding-cache lookup. ``outcome`` is ``hit`` or ``miss``."""
_add(
_embedding_cache_lookups(),
1,
{
"embedding.model": embedding_model or "unknown",
"chunker.kind": chunker_kind or "unknown",
"outcome": outcome,
},
)
def record_embedding_cache_eviction(count: int, *, phase: str) -> None:
"""Record evicted entries. ``phase`` is ``ttl`` or ``size``."""
if count <= 0:
return
_add(_embedding_cache_evictions(), count, {"phase": phase})
def record_chunk_reconcile(*, reused: int, embedded: int, deleted: int) -> None:
"""Record an incremental re-index: how many chunks were kept vs recomputed."""
for outcome, count in (
("reused", reused),
("embedded", embedded),
("deleted", deleted),
):
if count > 0:
_add(_chunk_reconcile_chunks(), count, {"outcome": outcome})
def record_celery_heartbeat_refresh(*, heartbeat_type: str) -> None:
_add(_celery_heartbeat_refreshes(), 1, {"heartbeat.type": heartbeat_type})
@ -863,9 +961,14 @@ __all__ = [
"record_celery_queue_latency",
"record_chat_request_duration",
"record_chat_request_outcome",
"record_chunk_reconcile",
"record_compaction_run",
"record_connector_sync_duration",
"record_connector_sync_outcome",
"record_embedding_cache_eviction",
"record_embedding_cache_lookup",
"record_etl_cache_eviction",
"record_etl_cache_lookup",
"record_etl_extract_duration",
"record_etl_extract_outcome",
"record_indexing_document_duration",

View file

@ -420,7 +420,10 @@ class ChucksHybridSearchRetriever:
select(
Chunk.id.label("chunk_id"),
func.row_number()
.over(partition_by=Chunk.document_id, order_by=Chunk.id)
.over(
partition_by=Chunk.document_id,
order_by=(Chunk.position, Chunk.id),
)
.label("rn"),
)
.where(Chunk.document_id.in_(doc_ids))
@ -441,7 +444,7 @@ class ChucksHybridSearchRetriever:
select(Chunk.id, Chunk.content, Chunk.document_id)
.join(numbered, Chunk.id == numbered.c.chunk_id)
.where(chunk_filter)
.order_by(Chunk.document_id, Chunk.id)
.order_by(Chunk.document_id, Chunk.position, Chunk.id)
)
t_fetch = time.perf_counter()

View file

@ -357,7 +357,10 @@ class DocumentHybridSearchRetriever:
select(
Chunk.id.label("chunk_id"),
func.row_number()
.over(partition_by=Chunk.document_id, order_by=Chunk.id)
.over(
partition_by=Chunk.document_id,
order_by=(Chunk.position, Chunk.id),
)
.label("rn"),
)
.where(Chunk.document_id.in_(doc_ids))
@ -369,7 +372,7 @@ class DocumentHybridSearchRetriever:
select(Chunk.id, Chunk.content, Chunk.document_id)
.join(numbered, Chunk.id == numbered.c.chunk_id)
.where(numbered.c.rn <= _MAX_FETCH_CHUNKS_PER_DOC)
.order_by(Chunk.document_id, Chunk.id)
.order_by(Chunk.document_id, Chunk.position, Chunk.id)
)
t_fetch = time.perf_counter()

View file

@ -1014,8 +1014,8 @@ async def get_document_by_chunk_id(
.filter(
Chunk.document_id == document.id,
or_(
Chunk.created_at < chunk.created_at,
and_(Chunk.created_at == chunk.created_at, Chunk.id < chunk.id),
Chunk.position < chunk.position,
and_(Chunk.position == chunk.position, Chunk.id < chunk.id),
),
)
)
@ -1027,7 +1027,7 @@ async def get_document_by_chunk_id(
windowed_result = await session.execute(
select(Chunk)
.filter(Chunk.document_id == document.id)
.order_by(Chunk.created_at, Chunk.id)
.order_by(Chunk.position, Chunk.id)
.offset(start)
.limit(end - start)
)
@ -1137,7 +1137,7 @@ async def get_document_chunks_paginated(
chunks_result = await session.execute(
select(Chunk)
.filter(Chunk.document_id == document_id)
.order_by(Chunk.created_at, Chunk.id)
.order_by(Chunk.position, Chunk.id)
.offset(offset)
.limit(page_size)
)

View file

@ -119,7 +119,7 @@ async def get_editor_content(
chunk_contents_result = await session.execute(
select(Chunk.content)
.filter(Chunk.document_id == document_id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
)
chunk_contents = chunk_contents_result.scalars().all()
@ -205,7 +205,7 @@ async def download_document_markdown(
chunk_contents_result = await session.execute(
select(Chunk.content)
.filter(Chunk.document_id == document_id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
)
chunk_contents = chunk_contents_result.scalars().all()
if chunk_contents:
@ -354,7 +354,7 @@ async def export_document(
chunk_contents_result = await session.execute(
select(Chunk.content)
.filter(Chunk.document_id == document_id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
)
chunk_contents = chunk_contents_result.scalars().all()
if chunk_contents:

View file

@ -156,7 +156,7 @@ async def _resolve_document_text(
stmt = (
select(Chunk.content)
.where(Chunk.document_id == document.id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
.limit(_MAX_CHUNKS_FOR_CONTEXT)
)
result = await session.execute(stmt)

View file

@ -62,7 +62,7 @@ async def _get_document_markdown(
chunk_result = await session.execute(
select(Chunk.content)
.filter(Chunk.document_id == document.id)
.order_by(Chunk.id)
.order_by(Chunk.position, Chunk.id)
)
chunks = chunk_result.scalars().all()
if chunks:

View file

@ -199,11 +199,12 @@ async def _extract_binary_attachment_markdown(
async def _run_etl_extract(*, file_path: str, filename: str, vision_llm):
"""Lazy-load ETL dependencies to avoid module-import cycles."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
return await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename)
return await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename),
vision_llm=vision_llm,
)

View file

@ -238,9 +238,14 @@ async def _restore_in_place_document(
chunk_embeddings = await asyncio.to_thread(embed_texts, chunk_texts)
session.add_all(
[
Chunk(document_id=doc.id, content=text, embedding=embedding)
for text, embedding in zip(
chunk_texts, chunk_embeddings, strict=True
Chunk(
document_id=doc.id,
content=text,
embedding=embedding,
position=i,
)
for i, (text, embedding) in enumerate(
zip(chunk_texts, chunk_embeddings, strict=True)
)
]
)
@ -336,8 +341,15 @@ async def _reinsert_document_from_revision(
chunk_embeddings = await asyncio.to_thread(embed_texts, chunk_texts)
session.add_all(
[
Chunk(document_id=new_doc.id, content=text, embedding=embedding)
for text, embedding in zip(chunk_texts, chunk_embeddings, strict=True)
Chunk(
document_id=new_doc.id,
content=text,
embedding=embedding,
position=i,
)
for i, (text, embedding) in enumerate(
zip(chunk_texts, chunk_embeddings, strict=True)
)
]
)

View file

@ -525,6 +525,7 @@ async def _simple_chunk_content(content: str, chunk_size: int = 4000) -> list:
Chunk(
content=chunk_text,
embedding=embed_text(chunk_text),
position=len(chunks),
)
)

View file

@ -162,12 +162,13 @@ async def _read_file_content(
All file types (plaintext, audio, direct-convert, document, image) are
handled by ``EtlPipelineService``.
"""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
mode = ProcessingMode.coerce(processing_mode)
result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=file_path, filename=filename, processing_mode=mode)
result = await extract_with_cache(
EtlRequest(file_path=file_path, filename=filename, processing_mode=mode),
vision_llm=vision_llm,
)
return result.markdown_content

View file

@ -1,8 +1,9 @@
"""
File document processors orchestrating content extraction and indexing.
Delegates content extraction to ``app.etl_pipeline.EtlPipelineService`` and
keeps only orchestration concerns (notifications, logging, page limits, saving).
Delegates content extraction to the cache-aware ``extract_with_cache`` facade
(over ``EtlPipelineService``) and keeps only orchestration concerns
(notifications, logging, page limits, saving).
"""
from __future__ import annotations
@ -116,8 +117,8 @@ async def _log_page_divergence(
async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Extract content from a non-document file (plaintext/direct_convert/audio/image) via the unified ETL pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
await _notify(ctx, "parsing", "Processing file")
await ctx.task_logger.log_task_progress(
@ -136,8 +137,9 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id)
etl_result = await EtlPipelineService(vision_llm=vision_llm).extract(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename)
etl_result = await extract_with_cache(
EtlRequest(file_path=ctx.file_path, filename=ctx.filename),
vision_llm=vision_llm,
)
with contextlib.suppress(Exception):
@ -183,8 +185,8 @@ async def _process_non_document_upload(ctx: _ProcessingContext) -> Document | No
async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
"""Route a document file to the configured ETL service via the unified pipeline."""
from app.etl_pipeline.cache import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.services.etl_credit_service import (
EtlCreditService,
InsufficientCreditsError,
@ -237,13 +239,14 @@ async def _process_document_upload(ctx: _ProcessingContext) -> Document | None:
vision_llm = await get_vision_llm(ctx.session, ctx.search_space_id)
etl_result = await EtlPipelineService(vision_llm=vision_llm).extract(
etl_result = await extract_with_cache(
EtlRequest(
file_path=ctx.file_path,
filename=ctx.filename,
estimated_pages=estimated_pages,
processing_mode=mode,
)
),
vision_llm=vision_llm,
)
with contextlib.suppress(Exception):
@ -381,7 +384,6 @@ async def _extract_file_content(
Tuple of (markdown_content, etl_service_name, billable_pages).
"""
from app.etl_pipeline.etl_document import EtlRequest, ProcessingMode
from app.etl_pipeline.etl_pipeline_service import EtlPipelineService
from app.etl_pipeline.file_classifier import (
FileCategory,
classify_file as etl_classify,
@ -432,13 +434,16 @@ async def _extract_file_content(
vision_llm = await get_vision_llm(session, search_space_id)
result = await EtlPipelineService(vision_llm=vision_llm).extract(
from app.etl_pipeline.cache import extract_with_cache
result = await extract_with_cache(
EtlRequest(
file_path=file_path,
filename=filename,
estimated_pages=estimated_pages,
processing_mode=mode,
)
),
vision_llm=vision_llm,
)
with contextlib.suppress(Exception):

View file

@ -188,8 +188,10 @@ async def create_document_chunks(content: str) -> list[Chunk]:
chunk_texts = [c.text for c in config.chunker_instance.chunk(content)]
chunk_embeddings = await asyncio.to_thread(embed_texts, chunk_texts)
return [
Chunk(content=text, embedding=emb)
for text, emb in zip(chunk_texts, chunk_embeddings, strict=False)
Chunk(content=text, embedding=emb, position=i)
for i, (text, emb) in enumerate(
zip(chunk_texts, chunk_embeddings, strict=False)
)
]

View file

@ -13,6 +13,14 @@ TEST_DATABASE_URL = os.environ.get("TEST_DATABASE_URL", _DEFAULT_TEST_DB)
# DATABASE_URL in the environment (e.g. from .env or shell profile).
os.environ["DATABASE_URL"] = TEST_DATABASE_URL
# Integration tests authenticate over HTTP via email/password, so the
# password-auth routers must be mounted (they are skipped under AUTH_TYPE=GOOGLE).
# setdefault (not load_dotenv, which runs later with override=False) lets a
# developer's .env=GOOGLE be overridden here while still honouring an explicitly
# exported shell AUTH_TYPE.
os.environ.setdefault("AUTH_TYPE", "LOCAL")
os.environ.setdefault("REGISTRATION_ENABLED", "TRUE")
import pytest # noqa: E402
from app.db import DocumentType # noqa: E402

View file

@ -57,9 +57,9 @@ def install(patches: list[Any]) -> None:
# Consumers that did `from app.utils.document_converters import embed_text/texts`
("app.indexing_pipeline.document_embedder.embed_text", fake_embed_text),
("app.indexing_pipeline.document_embedder.embed_texts", fake_embed_texts),
# Pipeline service binding (the actual call site for indexing.index)
# Index-cache facade binding (the actual call site for indexing.index)
(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
fake_embed_texts,
),
]

View file

@ -123,11 +123,24 @@ async def db_search_space(db_session: AsyncSession, db_user: User) -> SearchSpac
return space
@pytest.fixture(autouse=True)
def _derivation_caches_disabled(monkeypatch):
"""Keep integration tests hermetic regardless of the developer's .env.
With the embedding cache enabled, a successful index of some markdown makes
every later index of the same markdown a cache hit -- silently bypassing
patched ``embed_texts`` fakes/failure injections in unrelated tests. Cache
tests opt back in explicitly via ``monkeypatch.setattr``.
"""
monkeypatch.setattr(app_config, "ETL_CACHE_ENABLED", False)
monkeypatch.setattr(app_config, "EMBEDDING_CACHE_ENABLED", False)
@pytest.fixture
def patched_embed_texts(monkeypatch) -> MagicMock:
mock = MagicMock(side_effect=lambda texts: [[0.1] * _EMBEDDING_DIM for _ in texts])
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
mock,
)
return mock
@ -137,7 +150,7 @@ def patched_embed_texts(monkeypatch) -> MagicMock:
def patched_embed_texts_raises(monkeypatch) -> MagicMock:
mock = MagicMock(side_effect=RuntimeError("Embedding unavailable"))
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
mock,
)
return mock
@ -147,11 +160,11 @@ def patched_embed_texts_raises(monkeypatch) -> MagicMock:
def patched_chunk_text(monkeypatch) -> MagicMock:
mock = MagicMock(return_value=["Test chunk content."])
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
"app.indexing_pipeline.cache.cached_indexing.chunk_text",
mock,
)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text_hybrid",
"app.indexing_pipeline.cache.cached_indexing.chunk_text_hybrid",
mock,
)
return mock

View file

@ -283,11 +283,11 @@ async def credits():
def _mock_external_apis(monkeypatch):
"""Mock LLM, embedding, and chunking — these are external API boundaries."""
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
MagicMock(side_effect=lambda texts: [[0.1] * _EMBEDDING_DIM for _ in texts]),
)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
"app.indexing_pipeline.cache.cached_indexing.chunk_text",
MagicMock(return_value=["Test chunk content."]),
)

View file

@ -0,0 +1,32 @@
"""Real-infra fixtures for the parse-cache integration tests.
``cache_local_storage`` points the cache's blob store at a throwaway directory so
tests exercise the real ``LocalFileBackend`` (no cloud, no mocks). ``clean_cache_table``
removes rows written through the facade's own committing session, which the
savepoint-rolled-back ``db_session`` cannot undo.
"""
from __future__ import annotations
import pytest
import pytest_asyncio
from sqlalchemy import text
@pytest.fixture
def cache_local_storage(tmp_path, monkeypatch):
from app.config import config
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_BACKEND", "local")
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_LOCAL_PATH", str(tmp_path))
resolve_cache_backend.cache_clear()
yield tmp_path
resolve_cache_backend.cache_clear()
@pytest_asyncio.fixture
async def clean_cache_table(async_engine):
yield
async with async_engine.begin() as conn:
await conn.execute(text("DELETE FROM etl_cache_parses"))

View file

@ -0,0 +1,84 @@
"""extract_with_cache end-to-end: real DB + real local storage.
The only seam mocked is the parser itself (``EtlPipelineService.extract``) -- the
external boundary the facade wraps. Everything else (eligibility, hashing, recall,
remember, blob I/O) runs for real, so these tests prove the actual cost saving:
identical bytes are parsed once and reused.
"""
from __future__ import annotations
import pytest
from app.config import config
from app.etl_pipeline.cache.cached_extraction import extract_with_cache
from app.etl_pipeline.etl_document import EtlRequest, EtlResult, ProcessingMode
pytestmark = pytest.mark.integration
class _CountingParser:
"""Stand-in for the external parser; records how often it actually ran."""
def __init__(self, **_kwargs) -> None:
pass
calls = 0
async def extract(self, request: EtlRequest) -> EtlResult:
type(self).calls += 1
return EtlResult(
markdown_content="# Parsed once\n",
etl_service="LLAMACLOUD",
actual_pages=3,
content_type="application/pdf",
)
@pytest.fixture
def counting_parser(monkeypatch):
_CountingParser.calls = 0
monkeypatch.setattr(
"app.etl_pipeline.cache.cached_extraction.EtlPipelineService",
_CountingParser,
)
return _CountingParser
async def test_identical_uploads_are_parsed_once_then_served_from_cache(
tmp_path, monkeypatch, counting_parser, cache_local_storage, clean_cache_table
):
monkeypatch.setattr(config, "ETL_CACHE_ENABLED", True)
monkeypatch.setattr(config, "ETL_SERVICE", "LLAMACLOUD")
pdf = tmp_path / "doc.pdf"
pdf.write_bytes(b"%PDF-1.4 unique-bytes-for-this-test")
request = EtlRequest(
file_path=str(pdf), filename="doc.pdf", processing_mode=ProcessingMode.BASIC
)
first = await extract_with_cache(request)
second = await extract_with_cache(request)
assert counting_parser.calls == 1 # second upload reused the cache
assert first.markdown_content == second.markdown_content == "# Parsed once\n"
assert second.actual_pages == 3
assert second.content_type == "application/pdf"
async def test_disabled_cache_parses_every_time(
tmp_path, monkeypatch, counting_parser
):
monkeypatch.setattr(config, "ETL_CACHE_ENABLED", False)
monkeypatch.setattr(config, "ETL_SERVICE", "LLAMACLOUD")
pdf = tmp_path / "doc.pdf"
pdf.write_bytes(b"%PDF-1.4 another-unique-payload")
request = EtlRequest(
file_path=str(pdf), filename="doc.pdf", processing_mode=ProcessingMode.BASIC
)
await extract_with_cache(request)
await extract_with_cache(request)
assert counting_parser.calls == 2 # bypassed: no reuse

View file

@ -0,0 +1,96 @@
"""CachedParseRepository against real Postgres: the SQL behind eviction & dedup.
These verify the parts that only a real database can: coldest-first ordering by
reuse then recency, TTL cutoff selection, the size accumulator, and the
insert-once guarantee under a duplicate key.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from app.etl_pipeline.cache.persistence import CachedParseRepository
from app.etl_pipeline.cache.schemas import ParseKey
pytestmark = pytest.mark.integration
def _key(sha: str) -> ParseKey:
return ParseKey.for_document(
sha, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def _insert(repo, *, sha, size=100, storage_key=None):
key = _key(sha)
await repo.insert(
key=key,
content_type="application/pdf",
actual_pages=1,
storage_backend="local",
storage_key=storage_key or f"etl_cache/{sha}.md",
size_bytes=size,
)
return key
async def test_total_size_bytes_sums_all_rows(db_session):
repo = CachedParseRepository(db_session)
await _insert(repo, sha="a" * 64, size=100)
await _insert(repo, sha="b" * 64, size=250)
assert await repo.total_size_bytes() == 350
async def test_select_coldest_orders_by_reuse_then_recency(db_session):
repo = CachedParseRepository(db_session)
ka = await _insert(repo, sha="a" * 64)
kb = await _insert(repo, sha="b" * 64)
kc = await _insert(repo, sha="c" * 64)
# Warm B once and C twice; A stays untouched and should be coldest.
await repo.mark_used((await repo.get(kb)).id)
await repo.mark_used((await repo.get(kc)).id)
await repo.mark_used((await repo.get(kc)).id)
coldest = await repo.select_coldest(limit=10)
ids_by_reuse = [c.id for c in coldest]
assert ids_by_reuse[:3] == [
(await repo.get(ka)).id,
(await repo.get(kb)).id,
(await repo.get(kc)).id,
]
async def test_select_expired_returns_only_rows_older_than_cutoff(db_session):
repo = CachedParseRepository(db_session)
await _insert(repo, sha="a" * 64)
future = datetime.now(UTC) + timedelta(days=1)
past = datetime.now(UTC) - timedelta(days=1)
# Row was just used, so it's older than a future cutoff but not a past one.
assert len(await repo.select_expired(cutoff=future, limit=10)) == 1
assert await repo.select_expired(cutoff=past, limit=10) == []
async def test_duplicate_key_insert_keeps_the_first_row(db_session):
repo = CachedParseRepository(db_session)
key = await _insert(repo, sha="a" * 64, size=100, storage_key="etl_cache/first.md")
# Same content-addressed key (a concurrent re-parse): must be a no-op.
await repo.insert(
key=key,
content_type="application/pdf",
actual_pages=1,
storage_backend="local",
storage_key="etl_cache/second.md",
size_bytes=999,
)
row = await repo.get(key)
assert row.storage_key == "etl_cache/first.md"
assert await repo.total_size_bytes() == 100

View file

@ -0,0 +1,67 @@
"""EtlCacheService end-to-end against real Postgres + real local storage.
Exercises the public cache surface -- ``recall`` / ``remember`` -- with no mocks:
a miss returns nothing, and a remembered parse comes back as an equivalent
``EtlResult`` rebuilt from the row and the blob.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.service import EtlCacheService
from app.etl_pipeline.etl_document import EtlResult
pytestmark = pytest.mark.integration
def _key(sha: str = "c" * 64) -> ParseKey:
return ParseKey.for_document(
sha, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def test_recall_is_a_miss_for_an_unknown_key(db_session, cache_local_storage):
service = EtlCacheService(db_session)
assert await service.recall(_key()) is None
async def test_remembered_parse_recalls_as_equivalent_result(
db_session, cache_local_storage
):
service = EtlCacheService(db_session)
stored = EtlResult(
markdown_content="# Cached doc\n\nBody paragraph.\n",
etl_service="LLAMACLOUD",
actual_pages=7,
content_type="application/pdf",
)
await service.remember(_key(), stored)
recalled = await service.recall(_key())
assert recalled is not None
assert recalled.markdown_content == stored.markdown_content
assert recalled.etl_service == "LLAMACLOUD"
assert recalled.actual_pages == 7
assert recalled.content_type == "application/pdf"
async def test_repeated_recall_keeps_serving_the_same_content(
db_session, cache_local_storage
):
service = EtlCacheService(db_session)
stored = EtlResult(
markdown_content="# Stable\n",
etl_service="LLAMACLOUD",
actual_pages=1,
content_type="application/pdf",
)
await service.remember(_key(), stored)
first = await service.recall(_key())
second = await service.recall(_key())
assert first is not None and second is not None
assert first.markdown_content == second.markdown_content == "# Stable\n"

View file

@ -0,0 +1,96 @@
"""The eviction task on real infra: TTL expiry first, then coldest-over-budget.
Seeds entries through the real cache (DB rows + local blobs), runs the actual
``_evict`` coroutine, and checks what survives via ``recall`` -- no mocks. TTL and
budget are driven through config so each phase can be exercised in isolation.
"""
from __future__ import annotations
import pytest
from app.config import config
from app.etl_pipeline.cache.eviction.task import _evict
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.service import EtlCacheService
from app.etl_pipeline.etl_document import EtlResult
from app.tasks.celery_tasks import get_celery_session_maker
pytestmark = pytest.mark.integration
def _key(sha: str) -> ParseKey:
return ParseKey.for_document(
sha, etl_service="LLAMACLOUD", mode="basic", version=1
)
def _result(markdown: str) -> EtlResult:
return EtlResult(
markdown_content=markdown,
etl_service="LLAMACLOUD",
actual_pages=1,
content_type="application/pdf",
)
async def _remember(key: ParseKey, result: EtlResult) -> None:
async with get_celery_session_maker()() as session:
await EtlCacheService(session).remember(key, result)
async def _recall(key: ParseKey) -> EtlResult | None:
async with get_celery_session_maker()() as session:
return await EtlCacheService(session).recall(key)
async def test_expired_entries_are_pruned(
monkeypatch, cache_local_storage, clean_cache_table
):
monkeypatch.setattr(config, "ETL_CACHE_ENABLED", True)
monkeypatch.setattr(config, "ETL_CACHE_TTL_DAYS", -1) # cutoff in the future -> stale
monkeypatch.setattr(config, "ETL_CACHE_MAX_TOTAL_MB", 10_000) # size phase no-op
key = _key("a" * 64)
await _remember(key, _result("# stale doc\n"))
await _evict()
assert await _recall(key) is None
async def test_coldest_entries_are_shed_when_over_budget(
monkeypatch, cache_local_storage, clean_cache_table
):
monkeypatch.setattr(config, "ETL_CACHE_ENABLED", True)
monkeypatch.setattr(config, "ETL_CACHE_TTL_DAYS", 3650) # nothing TTL-expired
monkeypatch.setattr(config, "ETL_CACHE_MAX_TOTAL_MB", 1) # ~1 MiB budget
cold = _key("a" * 64)
warm = _key("b" * 64)
# Two ~0.6 MiB entries together exceed the 1 MiB budget; one must go.
await _remember(cold, _result("x" * 600_000))
await _remember(warm, _result("y" * 600_000))
# A reuse makes `warm` warmer than `cold`, so `cold` is the eviction target.
assert await _recall(warm) is not None
await _evict()
assert await _recall(cold) is None
assert await _recall(warm) is not None
async def test_nothing_is_evicted_within_ttl_and_budget(
monkeypatch, cache_local_storage, clean_cache_table
):
monkeypatch.setattr(config, "ETL_CACHE_ENABLED", True)
monkeypatch.setattr(config, "ETL_CACHE_TTL_DAYS", 3650)
monkeypatch.setattr(config, "ETL_CACHE_MAX_TOTAL_MB", 10_000)
key = _key("a" * 64)
await _remember(key, _result("# keep me\n"))
await _evict()
assert await _recall(key) is not None

View file

@ -0,0 +1,42 @@
"""MarkdownCacheStore against a real local filesystem backend (no mocks).
Proves the blob side of the cache: markdown written under a content-addressed key
comes back byte-for-byte, and a delete actually removes it.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.storage import MarkdownCacheStore
from app.etl_pipeline.cache.storage.object_keys import build_parse_object_key
pytestmark = pytest.mark.integration
def _key() -> ParseKey:
return ParseKey.for_document(
"d" * 64, etl_service="LLAMACLOUD", mode="basic", version=1
)
async def test_save_then_load_round_trips_markdown(cache_local_storage):
store = MarkdownCacheStore()
markdown = "# Title\n\nBody with unicode: café, naïve, 漢字.\n"
storage_key = await store.save(_key(), markdown)
assert storage_key == build_parse_object_key(_key())
assert await store.load(storage_key) == markdown
async def test_delete_removes_the_blob(cache_local_storage):
store = MarkdownCacheStore()
storage_key = await store.save(_key(), "to be deleted")
await store.delete(storage_key)
# Eviction deleted the blob; a later read must fail rather than serve stale.
with pytest.raises(FileNotFoundError):
await store.load(storage_key)

View file

@ -177,7 +177,7 @@ async def test_reindex_sets_status_ready(db_session, db_search_space, db_user, m
async def test_reindex_replaces_chunks(db_session, db_search_space, db_user, mocker):
"""Reindexing replaces old chunks with new content rather than appending."""
mocker.patch(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text_hybrid",
"app.indexing_pipeline.cache.cached_indexing.chunk_text_hybrid",
side_effect=[["Original chunk."], ["Updated chunk."]],
)

View file

@ -0,0 +1,33 @@
"""Real-infra fixtures for the embedding-cache integration tests.
``cache_local_storage`` points the shared cache backend at a throwaway directory
so tests exercise the real ``LocalFileBackend`` (no cloud, no mocks); the
embedding cache reuses the ETL cache backend, hence the ``ETL_CACHE_STORAGE_*``
knobs. ``clean_embedding_cache_table`` removes rows written through the store's
own committing session, which the savepoint-rolled-back ``db_session`` cannot undo.
"""
from __future__ import annotations
import pytest
import pytest_asyncio
from sqlalchemy import text
@pytest.fixture
def cache_local_storage(tmp_path, monkeypatch):
from app.config import config
from app.etl_pipeline.cache.storage.backend import resolve_cache_backend
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_BACKEND", "local")
monkeypatch.setattr(config, "ETL_CACHE_STORAGE_LOCAL_PATH", str(tmp_path))
resolve_cache_backend.cache_clear()
yield tmp_path
resolve_cache_backend.cache_clear()
@pytest_asyncio.fixture
async def clean_embedding_cache_table(async_engine):
yield
async with async_engine.begin() as conn:
await conn.execute(text("DELETE FROM embedding_cache_sets"))

View file

@ -0,0 +1,110 @@
"""CachedEmbeddingSetRepository against real Postgres: the SQL behind eviction & dedup.
These verify the parts only a real database can: the size accumulator,
coldest-first ordering by reuse then recency, TTL cutoff selection, the
insert-once guarantee under a duplicate key, and the reuse counter.
"""
from __future__ import annotations
from datetime import UTC, datetime, timedelta
import pytest
from app.indexing_pipeline.cache.persistence import CachedEmbeddingSetRepository
from app.indexing_pipeline.cache.schemas import EmbeddingKey
pytestmark = pytest.mark.integration
def _key(sha: str) -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256=sha,
embedding_model="test-model",
embedding_dim=4,
chunker_kind="hybrid",
chunker_version=1,
)
async def _insert(repo, *, sha, size=100, storage_key=None, chunk_count=1):
key = _key(sha)
await repo.insert(
key=key,
storage_backend="local",
storage_key=storage_key or f"embedding_cache/{sha}.emb",
size_bytes=size,
chunk_count=chunk_count,
)
return key
async def test_total_size_bytes_sums_all_rows(db_session):
repo = CachedEmbeddingSetRepository(db_session)
await _insert(repo, sha="a" * 64, size=100)
await _insert(repo, sha="b" * 64, size=250)
assert await repo.total_size_bytes() == 350
async def test_select_coldest_orders_by_reuse_then_recency(db_session):
repo = CachedEmbeddingSetRepository(db_session)
ka = await _insert(repo, sha="a" * 64)
kb = await _insert(repo, sha="b" * 64)
kc = await _insert(repo, sha="c" * 64)
# Warm B once and C twice; A stays untouched and should be coldest.
await repo.mark_used((await repo.get(kb)).id)
await repo.mark_used((await repo.get(kc)).id)
await repo.mark_used((await repo.get(kc)).id)
coldest = await repo.select_coldest(limit=10)
assert [c.id for c in coldest][:3] == [
(await repo.get(ka)).id,
(await repo.get(kb)).id,
(await repo.get(kc)).id,
]
async def test_select_expired_returns_only_rows_older_than_cutoff(db_session):
repo = CachedEmbeddingSetRepository(db_session)
await _insert(repo, sha="a" * 64)
future = datetime.now(UTC) + timedelta(days=1)
past = datetime.now(UTC) - timedelta(days=1)
# Row was just used, so it predates a future cutoff but not a past one.
assert len(await repo.select_expired(cutoff=future, limit=10)) == 1
assert await repo.select_expired(cutoff=past, limit=10) == []
async def test_duplicate_key_insert_keeps_the_first_row(db_session):
repo = CachedEmbeddingSetRepository(db_session)
key = await _insert(
repo, sha="a" * 64, size=100, storage_key="embedding_cache/first.emb"
)
# Same content-addressed key (a concurrent re-embed): must be a no-op.
await repo.insert(
key=key,
storage_backend="local",
storage_key="embedding_cache/second.emb",
size_bytes=999,
chunk_count=42,
)
row = await repo.get(key)
assert row.storage_key == "embedding_cache/first.emb"
assert await repo.total_size_bytes() == 100
async def test_mark_used_increments_reuse_count(db_session):
repo = CachedEmbeddingSetRepository(db_session)
key = await _insert(repo, sha="a" * 64)
assert (await repo.get(key)).times_reused == 0
await repo.mark_used((await repo.get(key)).id)
await repo.mark_used((await repo.get(key)).id)
assert (await repo.get(key)).times_reused == 2

View file

@ -0,0 +1,70 @@
"""EmbeddingCacheService end-to-end against real Postgres + real local storage.
Exercises the public cache surface -- ``recall`` / ``remember`` -- with no mocks:
a miss returns nothing, a remembered set comes back as equivalent vectors, and a
dimension mismatch is refused rather than served.
"""
from __future__ import annotations
import numpy as np
import pytest
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.service import EmbeddingCacheService
pytestmark = pytest.mark.integration
def _key(sha: str = "c" * 64, *, dim: int = 4) -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256=sha,
embedding_model="test-model",
embedding_dim=dim,
chunker_kind="hybrid",
chunker_version=1,
)
async def test_recall_is_a_miss_for_an_unknown_key(db_session, cache_local_storage):
service = EmbeddingCacheService(db_session)
assert await service.recall(_key()) is None
async def test_remembered_set_recalls_as_equivalent_vectors(
db_session, cache_local_storage, clean_embedding_cache_table
):
service = EmbeddingCacheService(db_session)
stored = EmbeddingSet(
summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32),
chunks=[
CachedChunk("first chunk", np.array([1.0, 0.0, 0.0, 0.0], dtype=np.float32)),
CachedChunk("second chunk", np.array([0.0, 1.0, 0.0, 0.0], dtype=np.float32)),
],
)
await service.remember(_key(), stored)
recalled = await service.recall(_key())
assert recalled is not None
assert np.array_equal(recalled.summary_embedding, stored.summary_embedding)
assert [c.text for c in recalled.chunks] == ["first chunk", "second chunk"]
assert np.array_equal(recalled.chunks[0].embedding, stored.chunks[0].embedding)
assert np.array_equal(recalled.chunks[1].embedding, stored.chunks[1].embedding)
async def test_recall_refuses_a_set_whose_dimension_changed(
db_session, cache_local_storage, clean_embedding_cache_table
):
# A model kept its name but changed its output width: never serve the stale blob.
service = EmbeddingCacheService(db_session)
stored = EmbeddingSet(
summary_embedding=np.array([0.1, 0.2, 0.3, 0.4], dtype=np.float32),
chunks=[CachedChunk("c", np.array([1.0, 2.0, 3.0, 4.0], dtype=np.float32))],
)
await service.remember(_key(dim=4), stored)
# Same identity (model + chunker + markdown), but the caller now expects dim 8.
recalled = await service.recall(_key(dim=8))
assert recalled is None

View file

@ -0,0 +1,63 @@
"""EmbeddingCacheStore against a real local filesystem backend (no mocks).
Proves the blob side of the cache: an embedding set written under a
content-addressed key comes back with identical vectors, and a delete actually
removes it.
"""
from __future__ import annotations
import numpy as np
import pytest
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingKey, EmbeddingSet
from app.indexing_pipeline.cache.storage import EmbeddingCacheStore
from app.indexing_pipeline.cache.storage.object_keys import build_embedding_object_key
pytestmark = pytest.mark.integration
def _key() -> EmbeddingKey:
return EmbeddingKey(
markdown_sha256="d" * 64,
embedding_model="test-model",
embedding_dim=4,
chunker_kind="hybrid",
chunker_version=1,
)
def _set() -> EmbeddingSet:
return EmbeddingSet(
summary_embedding=np.array([0.5, 0.25, 0.125, 0.0625], dtype=np.float32),
chunks=[
CachedChunk("café, naïve, 漢字", np.array([1, 2, 3, 4], dtype=np.float32)),
CachedChunk("second", np.array([5, 6, 7, 8], dtype=np.float32)),
],
)
async def test_save_then_load_round_trips_the_embedding_set(cache_local_storage):
store = EmbeddingCacheStore()
embedding_set = _set()
storage_key, size_bytes = await store.save(_key(), embedding_set)
loaded = await store.load(storage_key)
assert storage_key == build_embedding_object_key(_key())
assert size_bytes > 0
assert np.array_equal(loaded.summary_embedding, embedding_set.summary_embedding)
assert [c.text for c in loaded.chunks] == ["café, naïve, 漢字", "second"]
assert np.array_equal(loaded.chunks[0].embedding, embedding_set.chunks[0].embedding)
assert np.array_equal(loaded.chunks[1].embedding, embedding_set.chunks[1].embedding)
async def test_delete_removes_the_blob(cache_local_storage):
store = EmbeddingCacheStore()
storage_key, _ = await store.save(_key(), _set())
await store.delete(storage_key)
# Eviction deleted the blob; a later read must fail rather than serve stale.
with pytest.raises(FileNotFoundError):
await store.load(storage_key)

View file

@ -0,0 +1,193 @@
"""Edit path: re-indexing a document diffs chunks instead of replacing them.
Unchanged paragraphs must keep their chunk rows (ids survive -> embeddings and
HNSW entries untouched), only new text is embedded, removed text is deleted,
and (position) keeps presentation order correct throughout.
"""
import pytest
from sqlalchemy import select
from app.db import Chunk, DocumentStatus
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
pytestmark = pytest.mark.integration
_V1 = "Intro paragraph.\n\nBody paragraph.\n\nOutro paragraph."
@pytest.fixture
def paragraph_chunker(monkeypatch):
"""One chunk per markdown paragraph, so edits map to chunk-level diffs."""
def _split(markdown, **_kwargs):
return [p for p in markdown.split("\n\n") if p.strip()]
monkeypatch.setattr(
"app.indexing_pipeline.cache.cached_indexing.chunk_text", _split
)
monkeypatch.setattr(
"app.indexing_pipeline.cache.cached_indexing.chunk_text_hybrid", _split
)
async def _index(service, connector_doc):
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
await service.index(document, connector_doc)
return document
async def _load_chunks(db_session, document_id):
result = await db_session.execute(
select(Chunk)
.where(Chunk.document_id == document_id)
.order_by(Chunk.position, Chunk.id)
)
return result.scalars().all()
@pytest.mark.usefixtures("paragraph_chunker")
async def test_edit_keeps_unchanged_rows_and_embeds_only_the_new_text(
db_session,
db_search_space,
make_connector_document,
patched_embed_texts,
):
service = IndexingPipelineService(session=db_session)
doc_v1 = make_connector_document(
search_space_id=db_search_space.id, source_markdown=_V1
)
document = await _index(service, doc_v1)
ids_v1 = {c.content: c.id for c in await _load_chunks(db_session, document.id)}
patched_embed_texts.reset_mock()
edited = "Intro paragraph.\n\nBody paragraph EDITED.\n\nOutro paragraph."
doc_v2 = make_connector_document(
search_space_id=db_search_space.id, source_markdown=edited
)
await _index(service, doc_v2)
chunks = await _load_chunks(db_session, document.id)
by_content = {c.content: c for c in chunks}
# Untouched paragraphs keep their rows (same ids => embeddings reused,
# no HNSW/GIN churn); the edited paragraph got a fresh row.
assert by_content["Intro paragraph."].id == ids_v1["Intro paragraph."]
assert by_content["Outro paragraph."].id == ids_v1["Outro paragraph."]
assert "Body paragraph." not in by_content
assert by_content["Body paragraph EDITED."].id not in ids_v1.values()
# Exactly one embed call: the document summary plus only the edited text.
(embedded_texts,) = patched_embed_texts.call_args.args
assert embedded_texts == [edited, "Body paragraph EDITED."]
assert [c.position for c in chunks] == [0, 1, 2]
assert [c.content for c in chunks] == [
"Intro paragraph.",
"Body paragraph EDITED.",
"Outro paragraph.",
]
@pytest.mark.usefixtures("paragraph_chunker", "patched_embed_texts")
async def test_head_insert_shifts_positions_without_new_rows_for_old_text(
db_session,
db_search_space,
make_connector_document,
):
service = IndexingPipelineService(session=db_session)
document = await _index(
service,
make_connector_document(
search_space_id=db_search_space.id, source_markdown=_V1
),
)
ids_v1 = {c.content: c.id for c in await _load_chunks(db_session, document.id)}
await _index(
service,
make_connector_document(
search_space_id=db_search_space.id,
source_markdown="Brand new opener.\n\n" + _V1,
),
)
chunks = await _load_chunks(db_session, document.id)
assert [c.content for c in chunks] == [
"Brand new opener.",
"Intro paragraph.",
"Body paragraph.",
"Outro paragraph.",
]
assert [c.position for c in chunks] == [0, 1, 2, 3]
# The three original rows survived the shift.
surviving = {c.content: c.id for c in chunks if c.content in ids_v1}
assert surviving == ids_v1
@pytest.mark.usefixtures("paragraph_chunker", "patched_embed_texts")
async def test_removed_paragraph_is_deleted_and_order_compacts(
db_session,
db_search_space,
make_connector_document,
):
service = IndexingPipelineService(session=db_session)
document = await _index(
service,
make_connector_document(
search_space_id=db_search_space.id, source_markdown=_V1
),
)
ids_v1 = {c.content: c.id for c in await _load_chunks(db_session, document.id)}
await _index(
service,
make_connector_document(
search_space_id=db_search_space.id,
source_markdown="Intro paragraph.\n\nOutro paragraph.",
),
)
chunks = await _load_chunks(db_session, document.id)
assert [(c.content, c.position) for c in chunks] == [
("Intro paragraph.", 0),
("Outro paragraph.", 1),
]
assert chunks[0].id == ids_v1["Intro paragraph."]
assert chunks[1].id == ids_v1["Outro paragraph."]
@pytest.mark.usefixtures("paragraph_chunker", "patched_embed_texts")
async def test_kill_switch_falls_back_to_full_replace(
db_session,
db_search_space,
make_connector_document,
monkeypatch,
):
from app.config import config
service = IndexingPipelineService(session=db_session)
document = await _index(
service,
make_connector_document(
search_space_id=db_search_space.id, source_markdown=_V1
),
)
ids_v1 = {c.id for c in await _load_chunks(db_session, document.id)}
monkeypatch.setattr(config, "CHUNK_RECONCILE_ENABLED", False)
await _index(
service,
make_connector_document(
search_space_id=db_search_space.id,
source_markdown=_V1 + "\n\nAppended paragraph.",
),
)
chunks = await _load_chunks(db_session, document.id)
# Legacy behavior: every row is recreated, even unchanged paragraphs.
assert {c.id for c in chunks}.isdisjoint(ids_v1)
assert [c.position for c in chunks] == [0, 1, 2, 3]
assert DocumentStatus.is_state(document.status, DocumentStatus.READY)

View file

@ -0,0 +1,28 @@
"""Stub the cache package __init__s so unit tests import only pure leaf modules.
The real ``cache``/``storage``/``eviction``/``persistence`` __init__s eagerly
import the facade, file storage, Celery, and ``app.db`` -- none of which a pure
unit test should need. Turning those packages into bare namespace packages lets
``from app.etl_pipeline.cache.<pkg>.<leaf> import ...`` resolve the leaf module
without running the heavy __init__. ``schemas`` is left real (it is pure).
"""
import sys
import types
from pathlib import Path
_CACHE_DIR = Path(__file__).resolve().parents[4] / "app" / "etl_pipeline" / "cache"
def _stub_namespace_package(dotted: str, fs_dir: Path) -> None:
if dotted in sys.modules:
return
module = types.ModuleType(dotted)
module.__path__ = [str(fs_dir)]
module.__package__ = dotted
sys.modules[dotted] = module
_stub_namespace_package("app.etl_pipeline.cache", _CACHE_DIR)
_stub_namespace_package("app.etl_pipeline.cache.storage", _CACHE_DIR / "storage")
_stub_namespace_package("app.etl_pipeline.cache.eviction", _CACHE_DIR / "eviction")

View file

@ -0,0 +1,88 @@
"""What is allowed into the cache -- the gating rules, as pure logic.
These rules decide whether a given upload may be served from / written to the
parse cache. They live in a pure predicate so every branch (disabled, vision,
no service, file category) is covered here without touching DB, storage, or the
parser.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.eligibility import is_parse_cacheable
pytestmark = pytest.mark.unit
def test_document_with_service_and_cache_on_is_cacheable():
assert is_parse_cacheable(
filename="report.pdf",
etl_service="LLAMACLOUD",
cache_enabled=True,
has_vision_llm=False,
)
def test_disabled_cache_is_never_cacheable():
assert not is_parse_cacheable(
filename="report.pdf",
etl_service="LLAMACLOUD",
cache_enabled=False,
has_vision_llm=False,
)
def test_vision_llm_run_is_not_cacheable():
# Vision appends model output not captured by the key; sharing it would leak
# one run's generated text into a plain parse of the same bytes.
assert not is_parse_cacheable(
filename="report.pdf",
etl_service="LLAMACLOUD",
cache_enabled=True,
has_vision_llm=True,
)
@pytest.mark.parametrize("etl_service", [None, ""])
def test_missing_etl_service_is_not_cacheable(etl_service):
assert not is_parse_cacheable(
filename="report.pdf",
etl_service=etl_service,
cache_enabled=True,
has_vision_llm=False,
)
@pytest.mark.parametrize(
"filename",
["paper.pdf", "memo.docx", "slides.pptx", "sheet.xlsx", "book.epub"],
)
def test_document_extensions_are_cacheable(filename):
assert is_parse_cacheable(
filename=filename,
etl_service="LLAMACLOUD",
cache_enabled=True,
has_vision_llm=False,
)
@pytest.mark.parametrize(
"filename",
[
"notes.txt", # plaintext
"readme.md", # plaintext
"main.py", # plaintext
"podcast.mp3", # audio
"photo.png", # image (vision path / fallback, not a shared doc parse)
"data.csv", # direct-convert
"archive.xyz", # unsupported
],
)
def test_non_document_categories_are_not_cacheable(filename):
assert not is_parse_cacheable(
filename=filename,
etl_service="LLAMACLOUD",
cache_enabled=True,
has_vision_llm=False,
)

View file

@ -0,0 +1,76 @@
"""Size-based eviction: drop just enough of the coldest entries to fit budget.
The caller supplies candidates already ordered coldest-first; this pure rule only
decides how far down that list to cut. It must never over-evict (stop as soon as
the footprint fits) and never promise more than the candidates can free.
"""
from __future__ import annotations
from datetime import UTC, datetime
import pytest
from app.etl_pipeline.cache.eviction.policy import select_over_budget
from app.etl_pipeline.cache.schemas import EvictionCandidate
pytestmark = pytest.mark.unit
def _candidate(id_: int, size_bytes: int) -> EvictionCandidate:
return EvictionCandidate(
id=id_,
storage_key=f"etl_cache/{id_}.md",
size_bytes=size_bytes,
last_used_at=datetime(2026, 1, 1, tzinfo=UTC),
times_reused=0,
)
def test_over_budget_drops_coldest_until_it_fits():
# 300 used, budget 100 -> must free >=200. Coldest-first [120, 90, 70];
# 120+90=210 >=200, so the third (70) is spared.
coldest_first = [_candidate(1, 120), _candidate(2, 90), _candidate(3, 70)]
chosen = select_over_budget(
coldest_first, current_total_bytes=300, max_total_bytes=100
)
assert [c.id for c in chosen] == [1, 2]
@pytest.mark.parametrize("current_total_bytes", [100, 80])
def test_within_budget_evicts_nothing(current_total_bytes):
# At or under budget there is nothing to free, so no blob is touched.
coldest_first = [_candidate(1, 50), _candidate(2, 50)]
chosen = select_over_budget(
coldest_first,
current_total_bytes=current_total_bytes,
max_total_bytes=100,
)
assert chosen == []
def test_stops_as_soon_as_one_entry_covers_the_overage():
# Only 10 over budget; the first (cold) entry already frees enough.
coldest_first = [_candidate(1, 40), _candidate(2, 40)]
chosen = select_over_budget(
coldest_first, current_total_bytes=110, max_total_bytes=100
)
assert [c.id for c in chosen] == [1]
def test_returns_all_candidates_when_they_cannot_free_enough():
# Deficit is 500 but candidates only total 150: return everything available
# rather than looping forever or raising.
coldest_first = [_candidate(1, 100), _candidate(2, 50)]
chosen = select_over_budget(
coldest_first, current_total_bytes=600, max_total_bytes=100
)
assert [c.id for c in chosen] == [1, 2]

View file

@ -0,0 +1,70 @@
"""Content-addressing: equal (bytes + recipe) must map to one storage location.
This is the dedup guarantee the whole cache rests on -- two users uploading the
same file under the same parser settings have to land on the same object key, and
any change to bytes or recipe has to land somewhere else.
"""
from __future__ import annotations
import pytest
from app.etl_pipeline.cache.schemas import ParseKey
from app.etl_pipeline.cache.storage.object_keys import (
CACHE_PREFIX,
build_parse_object_key,
)
pytestmark = pytest.mark.unit
def _key(**overrides) -> ParseKey:
base = {
"source_sha256": "a" * 64,
"etl_service": "LLAMACLOUD",
"mode": "basic",
"version": 1,
}
base.update(overrides)
return ParseKey.for_document(
base["source_sha256"],
etl_service=base["etl_service"],
mode=base["mode"],
version=base["version"],
)
def test_same_bytes_and_recipe_produce_the_same_object_key():
assert build_parse_object_key(_key()) == build_parse_object_key(_key())
def test_different_bytes_produce_different_object_keys():
assert build_parse_object_key(
_key(source_sha256="a" * 64)
) != build_parse_object_key(_key(source_sha256="b" * 64))
@pytest.mark.parametrize(
"field, value",
[
("etl_service", "DOCLING"),
("mode", "premium"),
("version", 2),
],
)
def test_any_recipe_change_produces_a_different_object_key(field, value):
# Same bytes but a different parser/mode/version must not collide: the recipe
# is part of the identity, so changing it has to re-parse, not reuse.
assert build_parse_object_key(_key()) != build_parse_object_key(
_key(**{field: value})
)
def test_object_key_is_prefixed_and_sharded_by_source_hash():
# Shape matters operationally: a dedicated top-level prefix keeps cache blobs
# out of the normal store, and the sha directory groups every recipe variant
# of one file together.
key = _key()
assert build_parse_object_key(key) == (
f"{CACHE_PREFIX}/{key.source_sha256}/LLAMACLOUD.basic.v1.md"
)

View file

@ -0,0 +1,28 @@
"""Stub the cache package __init__s so unit tests import only pure leaf modules.
The real ``cache``/``storage``/``eviction``/``persistence`` __init__s eagerly
import the facade, file storage, Celery, and ``app.db`` -- none of which a pure
unit test should need. Turning those packages into bare namespace packages lets
``from app.indexing_pipeline.cache.<leaf> import ...`` resolve the leaf module
without running the heavy __init__. ``schemas`` is left real (it is pure).
"""
import sys
import types
from pathlib import Path
_CACHE_DIR = Path(__file__).resolve().parents[4] / "app" / "indexing_pipeline" / "cache"
def _stub_namespace_package(dotted: str, fs_dir: Path) -> None:
if dotted in sys.modules:
return
module = types.ModuleType(dotted)
module.__path__ = [str(fs_dir)]
module.__package__ = dotted
sys.modules[dotted] = module
_stub_namespace_package("app.indexing_pipeline.cache", _CACHE_DIR)
_stub_namespace_package("app.indexing_pipeline.cache.storage", _CACHE_DIR / "storage")
_stub_namespace_package("app.indexing_pipeline.cache.eviction", _CACHE_DIR / "eviction")

View file

@ -0,0 +1,28 @@
from app.indexing_pipeline.cache.eligibility import is_embedding_cacheable
def test_disabled_cache_is_never_cacheable():
assert not is_embedding_cacheable(
cache_enabled=False, embedding_model="m", embedding_dim=384
)
def test_missing_model_is_not_cacheable():
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model=None, embedding_dim=384
)
def test_missing_dimension_is_not_cacheable():
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=None
)
assert not is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=0
)
def test_enabled_with_model_and_dim_is_cacheable():
assert is_embedding_cacheable(
cache_enabled=True, embedding_model="m", embedding_dim=384
)

View file

@ -0,0 +1,31 @@
from app.indexing_pipeline.cache.schemas import EmbeddingKey
def _key(**overrides) -> EmbeddingKey:
base = {
"markdown_sha256": "a" * 64,
"embedding_model": "openai://text-embedding-3-small",
"embedding_dim": 1536,
"chunker_kind": "hybrid",
"chunker_version": 1,
}
base.update(overrides)
return EmbeddingKey(**base)
def test_object_suffix_is_stable():
assert _key().object_suffix == _key().object_suffix
def test_object_suffix_differs_by_model():
assert _key().object_suffix != _key(embedding_model="local/minilm").object_suffix
def test_object_suffix_differs_by_chunker_kind_and_version():
assert _key().object_suffix != _key(chunker_kind="code").object_suffix
assert _key().object_suffix != _key(chunker_version=2).object_suffix
def test_object_suffix_encodes_kind_and_version():
suffix = _key(chunker_kind="code", chunker_version=3).object_suffix
assert suffix.endswith(".code.v3.emb")

View file

@ -0,0 +1,52 @@
import numpy as np
import pytest
from app.indexing_pipeline.cache.schemas import CachedChunk, EmbeddingSet
from app.indexing_pipeline.cache.serialization import deserialize, serialize
def _make_set(dim: int, n_chunks: int) -> EmbeddingSet:
rng = np.random.default_rng(0)
return EmbeddingSet(
summary_embedding=rng.random(dim, dtype=np.float64),
chunks=[
CachedChunk(text=f"chunk {i}\nwith newline", embedding=rng.random(dim))
for i in range(n_chunks)
],
)
def test_round_trip_preserves_texts_and_vectors():
original = _make_set(dim=8, n_chunks=3)
restored = deserialize(serialize(original))
assert [c.text for c in restored.chunks] == [c.text for c in original.chunks]
assert restored.chunk_count == 3
assert np.allclose(restored.summary_embedding, original.summary_embedding, atol=1e-6)
for got, want in zip(restored.chunks, original.chunks, strict=True):
assert np.allclose(got.embedding, want.embedding, atol=1e-6)
def test_round_trip_with_no_chunks():
original = _make_set(dim=4, n_chunks=0)
restored = deserialize(serialize(original))
assert restored.chunk_count == 0
assert restored.summary_embedding.shape[0] == 4
def test_serialize_rejects_mismatched_dimensions():
bad = EmbeddingSet(
summary_embedding=np.zeros(4, dtype=np.float32),
chunks=[CachedChunk(text="x", embedding=np.zeros(8, dtype=np.float32))],
)
with pytest.raises(ValueError):
serialize(bad)
def test_deserialize_rejects_foreign_blob():
with pytest.raises(ValueError):
deserialize(b"not-a-surfsense-blob")

View file

@ -0,0 +1,94 @@
"""reconcile(): diff existing chunk rows against new chunk texts.
The reconciler decides which rows (and embeddings) survive an edit, which texts
must be embedded, and which rows go away -- purely from content, no DB.
"""
from __future__ import annotations
from app.indexing_pipeline.chunk_reconciler import ExistingChunk, reconcile
def _existing(*contents: str) -> list[ExistingChunk]:
return [
ExistingChunk(id=i + 1, content=text, position=i)
for i, text in enumerate(contents)
]
def test_identical_content_keeps_every_row_untouched():
plan = reconcile(_existing("alpha", "beta", "gamma"), ["alpha", "beta", "gamma"])
assert plan.to_embed == []
assert plan.to_delete == []
assert plan.reused == []
def test_head_insert_embeds_only_the_new_chunk_and_shifts_the_rest():
plan = reconcile(_existing("alpha", "beta"), ["intro", "alpha", "beta"])
assert plan.to_embed == [(0, "intro")]
assert plan.to_delete == []
# alpha: position 0 -> 1, beta: 1 -> 2; embeddings untouched.
assert plan.reused == [(1, 1), (2, 2)]
def test_middle_edit_swaps_exactly_one_chunk():
plan = reconcile(
_existing("alpha", "beta", "gamma"), ["alpha", "beta EDITED", "gamma"]
)
assert plan.to_embed == [(1, "beta EDITED")]
assert plan.to_delete == [2]
# Neighbours did not move, so no position writes at all.
assert plan.reused == []
def test_removed_chunk_is_deleted_and_followers_shift_up():
plan = reconcile(_existing("alpha", "beta", "gamma"), ["alpha", "gamma"])
assert plan.to_embed == []
assert plan.to_delete == [2]
assert plan.reused == [(3, 1)]
def test_duplicate_texts_pair_up_one_to_one():
# Two identical boilerplate chunks, only one survives the edit: exactly one
# row is kept and exactly one is deleted -- never both kept or both dropped.
plan = reconcile(_existing("boiler", "boiler", "body"), ["boiler", "body"])
assert plan.to_embed == []
assert plan.to_delete == [2]
assert plan.reused == [(3, 1)]
def test_duplicate_growth_embeds_only_the_extra_copy():
plan = reconcile(_existing("boiler", "body"), ["boiler", "boiler", "body"])
assert plan.to_embed == [(1, "boiler")]
assert plan.to_delete == []
assert plan.reused == [(2, 2)]
def test_reorder_becomes_position_updates_with_no_embedding():
plan = reconcile(_existing("alpha", "beta"), ["beta", "alpha"])
assert plan.to_embed == []
assert plan.to_delete == []
assert sorted(plan.reused) == [(1, 1), (2, 0)]
def test_full_rewrite_replaces_everything():
plan = reconcile(_existing("alpha", "beta"), ["new one", "new two"])
assert plan.to_embed == [(0, "new one"), (1, "new two")]
assert sorted(plan.to_delete) == [1, 2]
assert plan.reused == []
def test_no_existing_chunks_embeds_all():
plan = reconcile([], ["alpha", "beta"])
assert plan.to_embed == [(0, "alpha"), (1, "beta")]
assert plan.to_delete == []
assert plan.reused == []

View file

@ -54,7 +54,7 @@ async def test_index_calls_embed_and_chunk_via_to_thread(
mock_chunk_hybrid = MagicMock(return_value=["chunk1"])
mock_chunk_hybrid.__name__ = "chunk_text_hybrid"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text_hybrid",
"app.indexing_pipeline.cache.cached_indexing.chunk_text_hybrid",
mock_chunk_hybrid,
)
mock_embed = MagicMock(
@ -62,7 +62,7 @@ async def test_index_calls_embed_and_chunk_via_to_thread(
)
mock_embed.__name__ = "embed_texts"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
mock_embed,
)
# Bypass set_committed_value, which requires a real ORM instance (not MagicMock).
@ -102,17 +102,17 @@ async def test_non_code_documents_use_hybrid_chunker(
mock_chunk_hybrid = MagicMock(return_value=["chunk1"])
mock_chunk_hybrid.__name__ = "chunk_text_hybrid"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text_hybrid",
"app.indexing_pipeline.cache.cached_indexing.chunk_text_hybrid",
mock_chunk_hybrid,
)
mock_chunk_code = MagicMock(return_value=["chunk1"])
mock_chunk_code.__name__ = "chunk_text"
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
"app.indexing_pipeline.cache.cached_indexing.chunk_text",
mock_chunk_code,
)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_texts",
"app.indexing_pipeline.cache.cached_indexing.embed_texts",
MagicMock(side_effect=lambda texts: [[0.1] * _EMBEDDING_DIM for _ in texts]),
)
monkeypatch.setattr(

View file

@ -105,8 +105,7 @@ async def test_ainvoke_propagates_quota_insufficient_error(monkeypatch):
async def _denying_billable_call(**_kwargs):
raise QuotaInsufficientError(
usage_type="vision_extraction",
used_micros=5_000_000,
limit_micros=5_000_000,
balance_micros=0,
remaining_micros=0,
)
yield # unreachable but required for asynccontextmanager type

View file

@ -98,8 +98,7 @@ async def _denying_billable_call(**kwargs):
_CALL_LOG.append(kwargs)
raise QuotaInsufficientError(
usage_type=kwargs.get("usage_type", "?"),
used_micros=5_000_000,
limit_micros=5_000_000,
balance_micros=0,
remaining_micros=0,
)
yield SimpleNamespace() # pragma: no cover