Merge remote-tracking branch 'upstream/dev' into feat/document-test

This commit is contained in:
Anish Sarkar 2026-02-26 02:22:10 +05:30
commit f59a70f7a5
43 changed files with 2435 additions and 87 deletions

View file

@ -11,6 +11,8 @@ import httpx
import pytest
from dotenv import load_dotenv
from app.db import DocumentType
from app.indexing_pipeline.connector_document import ConnectorDocument
from tests.utils.helpers import (
BACKEND_URL,
TEST_EMAIL,
@ -28,6 +30,11 @@ DATABASE_URL = os.environ.get(
).replace("postgresql+asyncpg://", "postgresql://")
# ---------------------------------------------------------------------------
# E2E / integration helpers (direct DB access)
# ---------------------------------------------------------------------------
async def _force_delete_documents_db(
search_space_id: int,
) -> int:
@ -204,3 +211,44 @@ async def page_limits():
await _set_user_page_limits(
TEST_EMAIL, pages_used=original[0], pages_limit=original[1]
)
# ---------------------------------------------------------------------------
# Unit test fixtures
# ---------------------------------------------------------------------------
@pytest.fixture
def sample_user_id() -> str:
return "00000000-0000-0000-0000-000000000001"
@pytest.fixture
def sample_search_space_id() -> int:
return 1
@pytest.fixture
def sample_connector_id() -> int:
return 42
@pytest.fixture
def make_connector_document():
"""
Generic factory for unit tests. Overridden in tests/integration/conftest.py
with real DB-backed IDs for integration tests.
"""
def _make(**overrides):
defaults = {
"title": "Test Document",
"source_markdown": "## Heading\n\nSome content.",
"unique_id": "test-id-001",
"document_type": DocumentType.CLICKUP_CONNECTOR,
"search_space_id": 1,
"connector_id": 1,
"created_by_id": "00000000-0000-0000-0000-000000000001",
}
defaults.update(overrides)
return ConnectorDocument(**defaults)
return _make

View file

@ -0,0 +1,164 @@
import os
import uuid
from unittest.mock import AsyncMock, MagicMock
import pytest
import pytest_asyncio
from sqlalchemy import text
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine
from sqlalchemy.pool import NullPool
from app.db import Base, SearchSpace, SearchSourceConnector, SearchSourceConnectorType
from app.db import User
from app.db import DocumentType
from app.indexing_pipeline.connector_document import ConnectorDocument
_EMBEDDING_DIM = 1024 # must match the Vector() dimension used in DB column creation
_DEFAULT_TEST_DB = "postgresql+asyncpg://postgres:postgres@localhost:5432/surfsense_test"
TEST_DATABASE_URL = os.environ.get("TEST_DATABASE_URL", _DEFAULT_TEST_DB)
@pytest_asyncio.fixture(scope="session")
async def async_engine():
engine = create_async_engine(
TEST_DATABASE_URL,
poolclass=NullPool,
echo=False,
# Required for asyncpg + savepoints: disables prepared statement cache
# to prevent "another operation is in progress" errors during savepoint rollbacks.
connect_args={"prepared_statement_cache_size": 0},
)
async with engine.begin() as conn:
await conn.execute(text("CREATE EXTENSION IF NOT EXISTS vector"))
await conn.run_sync(Base.metadata.create_all)
yield engine
# drop_all fails on circular FKs (new_chat_threads ↔ public_chat_snapshots).
# DROP SCHEMA CASCADE handles this without needing topological sort.
async with engine.begin() as conn:
await conn.execute(text("DROP SCHEMA public CASCADE"))
await conn.execute(text("CREATE SCHEMA public"))
await engine.dispose()
@pytest_asyncio.fixture
async def db_session(async_engine) -> AsyncSession:
# Bind the session to a connection that holds an outer transaction.
# join_transaction_mode="create_savepoint" makes session.commit() release
# a SAVEPOINT instead of committing the outer transaction, so the final
# transaction.rollback() undoes everything — including commits made by the
# service under test — leaving the DB clean for the next test.
async with async_engine.connect() as conn:
transaction = await conn.begin()
async with AsyncSession(
bind=conn,
expire_on_commit=False,
join_transaction_mode="create_savepoint",
) as session:
yield session
await transaction.rollback()
@pytest_asyncio.fixture
async def db_user(db_session: AsyncSession) -> User:
user = User(
id=uuid.uuid4(),
email="test@surfsense.net",
hashed_password="hashed",
is_active=True,
is_superuser=False,
is_verified=True,
)
db_session.add(user)
await db_session.flush()
return user
@pytest_asyncio.fixture
async def db_connector(db_session: AsyncSession, db_user: User, db_search_space: "SearchSpace") -> SearchSourceConnector:
connector = SearchSourceConnector(
name="Test Connector",
connector_type=SearchSourceConnectorType.CLICKUP_CONNECTOR,
config={},
search_space_id=db_search_space.id,
user_id=db_user.id,
)
db_session.add(connector)
await db_session.flush()
return connector
@pytest_asyncio.fixture
async def db_search_space(db_session: AsyncSession, db_user: User) -> SearchSpace:
space = SearchSpace(
name="Test Space",
user_id=db_user.id,
)
db_session.add(space)
await db_session.flush()
return space
@pytest.fixture
def patched_summarize(monkeypatch) -> AsyncMock:
mock = AsyncMock(return_value="Mocked summary.")
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
mock,
)
return mock
@pytest.fixture
def patched_summarize_raises(monkeypatch) -> AsyncMock:
mock = AsyncMock(side_effect=RuntimeError("LLM unavailable"))
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.summarize_document",
mock,
)
return mock
@pytest.fixture
def patched_embed_text(monkeypatch) -> MagicMock:
mock = MagicMock(return_value=[0.1] * _EMBEDDING_DIM)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.embed_text",
mock,
)
return mock
@pytest.fixture
def patched_chunk_text(monkeypatch) -> MagicMock:
mock = MagicMock(return_value=["Test chunk content."])
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.chunk_text",
mock,
)
return mock
@pytest.fixture
def make_connector_document(db_connector, db_user):
"""Integration-scoped override: uses real DB connector and user IDs."""
def _make(**overrides):
defaults = {
"title": "Test Document",
"source_markdown": "## Heading\n\nSome content.",
"unique_id": "test-id-001",
"document_type": DocumentType.CLICKUP_CONNECTOR,
"search_space_id": db_connector.search_space_id,
"connector_id": db_connector.id,
"created_by_id": str(db_user.id),
}
defaults.update(overrides)
return ConnectorDocument(**defaults)
return _make

View file

@ -0,0 +1,91 @@
import pytest
from sqlalchemy import select
from app.db import Chunk, Document, DocumentStatus
from app.indexing_pipeline.adapters.file_upload_adapter import index_uploaded_file
pytestmark = pytest.mark.integration
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_sets_status_ready(db_session, db_search_space, db_user, mocker):
"""Document status is READY after successful indexing."""
await index_uploaded_file(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
assert DocumentStatus.is_state(document.status, DocumentStatus.READY)
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_content_is_summary(db_session, db_search_space, db_user, mocker):
"""Document content is set to the LLM-generated summary."""
await index_uploaded_file(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
assert document.content == "Mocked summary."
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_chunks_written_to_db(db_session, db_search_space, db_user, mocker):
"""Chunks derived from the source markdown are persisted in the DB."""
await index_uploaded_file(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
document = result.scalars().first()
chunks_result = await db_session.execute(
select(Chunk).filter(Chunk.document_id == document.id)
)
chunks = chunks_result.scalars().all()
assert len(chunks) == 1
assert chunks[0].content == "Test chunk content."
@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text")
async def test_raises_on_indexing_failure(db_session, db_search_space, db_user, mocker):
"""RuntimeError is raised when the indexing step fails so the caller can fire a failure notification."""
with pytest.raises(RuntimeError):
await index_uploaded_file(
markdown_content="## Hello\n\nSome content.",
filename="test.pdf",
etl_service="UNSTRUCTURED",
search_space_id=db_search_space.id,
user_id=str(db_user.id),
session=db_session,
llm=mocker.Mock(),
)

View file

@ -0,0 +1,266 @@
import pytest
from sqlalchemy import select
from app.db import Chunk, Document, DocumentStatus
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
pytestmark = pytest.mark.integration
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_sets_status_ready(
db_session, db_search_space, make_connector_document, mocker,
):
"""Document status is READY after successful indexing."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.READY)
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_content_is_summary_when_should_summarize_true(
db_session, db_search_space, make_connector_document, mocker,
):
"""Document content is set to the LLM-generated summary when should_summarize=True."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.content == "Mocked summary."
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_content_is_source_markdown_when_should_summarize_false(
db_session, db_search_space, make_connector_document,
):
"""Document content is set to source_markdown verbatim when should_summarize=False."""
connector_doc = make_connector_document(
search_space_id=db_search_space.id,
should_summarize=False,
source_markdown="## Raw content",
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=None)
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.content == "## Raw content"
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_chunks_written_to_db(
db_session, db_search_space, make_connector_document, mocker,
):
"""Chunks derived from source_markdown are persisted in the DB."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(
select(Chunk).filter(Chunk.document_id == document_id)
)
chunks = result.scalars().all()
assert len(chunks) == 1
assert chunks[0].content == "Test chunk content."
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_embedding_written_to_db(
db_session, db_search_space, make_connector_document, mocker,
):
"""Document embedding vector is persisted in the DB after indexing."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.embedding is not None
assert len(reloaded.embedding) == 1024
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_updated_at_advances_after_indexing(
db_session, db_search_space, make_connector_document, mocker,
):
"""updated_at timestamp is later after indexing than it was at prepare time."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_pending = result.scalars().first().updated_at
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_ready = result.scalars().first().updated_at
assert updated_at_ready > updated_at_pending
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_no_llm_falls_back_to_source_markdown(
db_session, db_search_space, make_connector_document,
):
"""When llm=None and no fallback_summary, content falls back to source_markdown."""
connector_doc = make_connector_document(
search_space_id=db_search_space.id,
should_summarize=True,
source_markdown="## Fallback content",
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=None)
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.READY)
assert reloaded.content == "## Fallback content"
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_fallback_summary_used_when_llm_unavailable(
db_session, db_search_space, make_connector_document,
):
"""fallback_summary is used as content when llm=None and should_summarize=True."""
connector_doc = make_connector_document(
search_space_id=db_search_space.id,
should_summarize=True,
source_markdown="## Full raw content",
fallback_summary="Short pre-built summary.",
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document_id = prepared[0].id
await service.index(prepared[0], connector_doc, llm=None)
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.READY)
assert reloaded.content == "Short pre-built summary."
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_reindex_replaces_old_chunks(
db_session, db_search_space, make_connector_document, mocker,
):
"""Re-indexing a document replaces its old chunks rather than appending."""
connector_doc = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v1",
)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
updated_doc = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v2",
)
re_prepared = await service.prepare_for_indexing([updated_doc])
await service.index(re_prepared[0], updated_doc, llm=mocker.Mock())
result = await db_session.execute(
select(Chunk).filter(Chunk.document_id == document_id)
)
chunks = result.scalars().all()
assert len(chunks) == 1
@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text")
async def test_llm_error_sets_status_failed(
db_session, db_search_space, make_connector_document, mocker,
):
"""Document status is FAILED when the LLM raises during indexing."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.FAILED)
@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text")
async def test_llm_error_leaves_no_partial_data(
db_session, db_search_space, make_connector_document, mocker,
):
"""A failed indexing attempt leaves no partial embedding or chunks in the DB."""
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
await service.index(document, connector_doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.embedding is None
assert reloaded.content == "Pending..."
chunks_result = await db_session.execute(
select(Chunk).filter(Chunk.document_id == document_id)
)
assert chunks_result.scalars().all() == []

View file

@ -0,0 +1,377 @@
import pytest
from sqlalchemy import select
from app.db import Document, DocumentStatus
from app.indexing_pipeline.document_hashing import compute_content_hash as real_compute_content_hash
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
pytestmark = pytest.mark.integration
async def test_new_document_is_persisted_with_pending_status(
db_session, db_search_space, make_connector_document
):
"""A new document is created in the DB with PENDING status and correct markdown."""
doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([doc])
assert len(results) == 1
document_id = results[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded is not None
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.PENDING)
assert reloaded.source_markdown == doc.source_markdown
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_unchanged_ready_document_is_skipped(
db_session, db_search_space, make_connector_document, mocker,
):
"""A READY document with unchanged content is not returned for re-indexing."""
doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
# Index fully so the document reaches ready state
prepared = await service.prepare_for_indexing([doc])
await service.index(prepared[0], doc, llm=mocker.Mock())
# Same content on the next run — a ready document must be skipped
results = await service.prepare_for_indexing([doc])
assert results == []
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_title_only_change_updates_title_in_db(
db_session, db_search_space, make_connector_document, mocker,
):
"""A title-only change updates the DB title without re-queuing the document."""
original = make_connector_document(search_space_id=db_search_space.id, title="Original Title")
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([original])
document_id = prepared[0].id
await service.index(prepared[0], original, llm=mocker.Mock())
renamed = make_connector_document(search_space_id=db_search_space.id, title="Updated Title")
results = await service.prepare_for_indexing([renamed])
assert results == []
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.title == "Updated Title"
async def test_changed_content_is_returned_for_reprocessing(
db_session, db_search_space, make_connector_document
):
"""A document with changed content is returned for re-indexing with updated markdown."""
original = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v1")
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
original_id = first[0].id
updated = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v2")
results = await service.prepare_for_indexing([updated])
assert len(results) == 1
assert results[0].id == original_id
result = await db_session.execute(select(Document).filter(Document.id == original_id))
reloaded = result.scalars().first()
assert reloaded.source_markdown == "## v2"
assert DocumentStatus.is_state(reloaded.status, DocumentStatus.PENDING)
async def test_all_documents_in_batch_are_persisted(
db_session, db_search_space, make_connector_document
):
"""All documents in a batch are persisted and returned."""
docs = [
make_connector_document(search_space_id=db_search_space.id, unique_id="id-1", title="Doc 1", source_markdown="## Content 1"),
make_connector_document(search_space_id=db_search_space.id, unique_id="id-2", title="Doc 2", source_markdown="## Content 2"),
make_connector_document(search_space_id=db_search_space.id, unique_id="id-3", title="Doc 3", source_markdown="## Content 3"),
]
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing(docs)
assert len(results) == 3
result = await db_session.execute(select(Document).filter(Document.search_space_id == db_search_space.id))
rows = result.scalars().all()
assert len(rows) == 3
async def test_duplicate_in_batch_is_persisted_once(
db_session, db_search_space, make_connector_document
):
"""The same document passed twice in a batch is only persisted once."""
doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([doc, doc])
assert len(results) == 1
result = await db_session.execute(select(Document).filter(Document.search_space_id == db_search_space.id))
rows = result.scalars().all()
assert len(rows) == 1
async def test_created_by_id_is_persisted(
db_session, db_user, db_search_space, make_connector_document
):
"""created_by_id from the connector document is persisted on the DB row."""
doc = make_connector_document(
search_space_id=db_search_space.id,
created_by_id=str(db_user.id),
)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([doc])
document_id = results[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert str(reloaded.created_by_id) == str(db_user.id)
async def test_metadata_is_updated_when_content_changes(
db_session, db_search_space, make_connector_document
):
"""document_metadata is overwritten with the latest metadata when content changes."""
original = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v1",
metadata={"status": "in_progress"},
)
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
document_id = first[0].id
updated = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v2",
metadata={"status": "done"},
)
await service.prepare_for_indexing([updated])
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.document_metadata == {"status": "done"}
async def test_updated_at_advances_when_title_only_changes(
db_session, db_search_space, make_connector_document
):
"""updated_at advances even when only the title changes."""
original = make_connector_document(search_space_id=db_search_space.id, title="Old Title")
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
document_id = first[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v1 = result.scalars().first().updated_at
renamed = make_connector_document(search_space_id=db_search_space.id, title="New Title")
await service.prepare_for_indexing([renamed])
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v2 = result.scalars().first().updated_at
assert updated_at_v2 > updated_at_v1
async def test_updated_at_advances_when_content_changes(
db_session, db_search_space, make_connector_document
):
"""updated_at advances when document content changes."""
original = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v1")
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
document_id = first[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v1 = result.scalars().first().updated_at
updated = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v2")
await service.prepare_for_indexing([updated])
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v2 = result.scalars().first().updated_at
assert updated_at_v2 > updated_at_v1
async def test_same_content_from_different_source_skipped_in_single_batch(
db_session, db_search_space, make_connector_document
):
"""Two documents with identical content in the same batch result in only one being persisted."""
first = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-a",
source_markdown="## Shared content",
)
second = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-b",
source_markdown="## Shared content",
)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([first, second])
assert len(results) == 1
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert len(result.scalars().all()) == 1
async def test_same_content_from_different_source_is_skipped(
db_session, db_search_space, make_connector_document
):
"""A document with content identical to an already-indexed document is skipped."""
first = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-a",
source_markdown="## Shared content",
)
second = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-b",
source_markdown="## Shared content",
)
service = IndexingPipelineService(session=db_session)
await service.prepare_for_indexing([first])
results = await service.prepare_for_indexing([second])
assert results == []
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert len(result.scalars().all()) == 1
@pytest.mark.usefixtures("patched_summarize_raises", "patched_embed_text", "patched_chunk_text")
async def test_failed_document_with_unchanged_content_is_requeued(
db_session, db_search_space, make_connector_document, mocker,
):
"""A FAILED document with unchanged content is re-queued as PENDING on the next run."""
doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
# First run: document is created and indexing crashes → status = failed
prepared = await service.prepare_for_indexing([doc])
document_id = prepared[0].id
await service.index(prepared[0], doc, llm=mocker.Mock())
result = await db_session.execute(select(Document).filter(Document.id == document_id))
assert DocumentStatus.is_state(result.scalars().first().status, DocumentStatus.FAILED)
# Next run: same content, pipeline must re-queue the failed document
results = await service.prepare_for_indexing([doc])
assert len(results) == 1
assert results[0].id == document_id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
assert DocumentStatus.is_state(result.scalars().first().status, DocumentStatus.PENDING)
async def test_title_and_content_change_updates_both_and_returns_document(
db_session, db_search_space, make_connector_document
):
"""When both title and content change, both are updated and the document is returned for re-indexing."""
original = make_connector_document(
search_space_id=db_search_space.id,
title="Original Title",
source_markdown="## v1",
)
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
original_id = first[0].id
updated = make_connector_document(
search_space_id=db_search_space.id,
title="Updated Title",
source_markdown="## v2",
)
results = await service.prepare_for_indexing([updated])
assert len(results) == 1
assert results[0].id == original_id
result = await db_session.execute(select(Document).filter(Document.id == original_id))
reloaded = result.scalars().first()
assert reloaded.title == "Updated Title"
assert reloaded.source_markdown == "## v2"
async def test_one_bad_document_in_batch_does_not_prevent_others_from_being_persisted(
db_session, db_search_space, make_connector_document, monkeypatch,
):
"""
A per-document error during prepare_for_indexing must be isolated.
The two valid documents around the failing one must still be persisted.
"""
docs = [
make_connector_document(
search_space_id=db_search_space.id,
unique_id="good-1",
source_markdown="## Good doc 1",
),
make_connector_document(
search_space_id=db_search_space.id,
unique_id="will-fail",
source_markdown="## Bad doc",
),
make_connector_document(
search_space_id=db_search_space.id,
unique_id="good-2",
source_markdown="## Good doc 2",
),
]
def compute_content_hash_with_error(doc):
if doc.unique_id == "will-fail":
raise RuntimeError("Simulated per-document failure")
return real_compute_content_hash(doc)
monkeypatch.setattr(
"app.indexing_pipeline.indexing_pipeline_service.compute_content_hash",
compute_content_hash_with_error,
)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing(docs)
assert len(results) == 2
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert len(result.scalars().all()) == 2

View file

View file

@ -0,0 +1,33 @@
import pytest
from unittest.mock import AsyncMock, MagicMock
@pytest.fixture
def patched_summarizer_chain(monkeypatch):
chain = MagicMock()
chain.ainvoke = AsyncMock(return_value=MagicMock(content="The summary."))
template = MagicMock()
template.__or__ = MagicMock(return_value=chain)
monkeypatch.setattr(
"app.indexing_pipeline.document_summarizer.SUMMARY_PROMPT_TEMPLATE",
template,
)
return chain
@pytest.fixture
def patched_chunker_instance(monkeypatch):
mock = MagicMock()
mock.chunk.return_value = [MagicMock(text="prose chunk")]
monkeypatch.setattr("app.indexing_pipeline.document_chunker.config.chunker_instance", mock)
return mock
@pytest.fixture
def patched_code_chunker_instance(monkeypatch):
mock = MagicMock()
mock.chunk.return_value = [MagicMock(text="code chunk")]
monkeypatch.setattr("app.indexing_pipeline.document_chunker.config.code_chunker_instance", mock)
return mock

View file

@ -0,0 +1,112 @@
import pytest
from pydantic import ValidationError
from app.db import DocumentType
from app.indexing_pipeline.connector_document import ConnectorDocument
def test_valid_document_created_with_required_fields():
"""All optional fields default correctly when only required fields are supplied."""
doc = ConnectorDocument(
title="Task",
source_markdown="## Task\n\nSome content.",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
connector_id=42,
created_by_id="00000000-0000-0000-0000-000000000001",
)
assert doc.should_summarize is True
assert doc.should_use_code_chunker is False
assert doc.metadata == {}
assert doc.connector_id == 42
assert doc.created_by_id == "00000000-0000-0000-0000-000000000001"
def test_omitting_created_by_id_raises():
"""Omitting created_by_id raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown="## Content",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
connector_id=42,
)
def test_empty_source_markdown_raises():
"""Empty source_markdown raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown="",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
)
def test_whitespace_only_source_markdown_raises():
"""Whitespace-only source_markdown raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown=" \n\t ",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
)
def test_empty_title_raises():
"""Empty title raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="",
source_markdown="## Content",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
)
def test_empty_created_by_id_raises():
"""Empty created_by_id raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown="## Content",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
connector_id=42,
created_by_id="",
)
def test_zero_search_space_id_raises():
"""search_space_id of zero raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown="## Content",
unique_id="task-1",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=0,
connector_id=42,
created_by_id="00000000-0000-0000-0000-000000000001",
)
def test_empty_unique_id_raises():
"""Empty unique_id raises a validation error."""
with pytest.raises(ValidationError):
ConnectorDocument(
title="Task",
source_markdown="## Content",
unique_id="",
document_type=DocumentType.CLICKUP_CONNECTOR,
search_space_id=1,
)

View file

@ -0,0 +1,21 @@
import pytest
from app.indexing_pipeline.document_chunker import chunk_text
pytestmark = pytest.mark.unit
@pytest.mark.usefixtures("patched_chunker_instance", "patched_code_chunker_instance")
def test_uses_code_chunker_when_flag_is_true():
"""Code chunker is selected when use_code_chunker=True."""
result = chunk_text("def foo(): pass", use_code_chunker=True)
assert result == ["code chunk"]
@pytest.mark.usefixtures("patched_chunker_instance", "patched_code_chunker_instance")
def test_uses_default_chunker_when_flag_is_false():
"""Default prose chunker is selected when use_code_chunker=False."""
result = chunk_text("Some prose text.", use_code_chunker=False)
assert result == ["prose chunk"]

View file

@ -0,0 +1,48 @@
import pytest
from app.db import DocumentType
from app.indexing_pipeline.document_hashing import compute_content_hash, compute_unique_identifier_hash
pytestmark = pytest.mark.unit
def test_different_unique_id_produces_different_hash(make_connector_document):
"""Two documents with different unique_ids produce different identifier hashes."""
doc_a = make_connector_document(unique_id="id-001")
doc_b = make_connector_document(unique_id="id-002")
assert compute_unique_identifier_hash(doc_a) != compute_unique_identifier_hash(doc_b)
def test_different_search_space_produces_different_identifier_hash(make_connector_document):
"""Same document in different search spaces produces different identifier hashes."""
doc_a = make_connector_document(search_space_id=1)
doc_b = make_connector_document(search_space_id=2)
assert compute_unique_identifier_hash(doc_a) != compute_unique_identifier_hash(doc_b)
def test_different_document_type_produces_different_identifier_hash(make_connector_document):
"""Same unique_id with different document types produces different identifier hashes."""
doc_a = make_connector_document(document_type=DocumentType.CLICKUP_CONNECTOR)
doc_b = make_connector_document(document_type=DocumentType.NOTION_CONNECTOR)
assert compute_unique_identifier_hash(doc_a) != compute_unique_identifier_hash(doc_b)
def test_same_content_same_space_produces_same_content_hash(make_connector_document):
"""Identical content in the same search space always produces the same content hash."""
doc_a = make_connector_document(source_markdown="Hello world", search_space_id=1)
doc_b = make_connector_document(source_markdown="Hello world", search_space_id=1)
assert compute_content_hash(doc_a) == compute_content_hash(doc_b)
def test_same_content_different_space_produces_different_content_hash(make_connector_document):
"""Identical content in different search spaces produces different content hashes."""
doc_a = make_connector_document(source_markdown="Hello world", search_space_id=1)
doc_b = make_connector_document(source_markdown="Hello world", search_space_id=2)
assert compute_content_hash(doc_a) != compute_content_hash(doc_b)
def test_different_content_produces_different_content_hash(make_connector_document):
"""Different source markdown produces different content hashes."""
doc_a = make_connector_document(source_markdown="Original content")
doc_b = make_connector_document(source_markdown="Updated content")
assert compute_content_hash(doc_a) != compute_content_hash(doc_b)

View file

@ -0,0 +1,42 @@
import pytest
from unittest.mock import MagicMock
from app.indexing_pipeline.document_summarizer import summarize_document
pytestmark = pytest.mark.unit
@pytest.mark.usefixtures("patched_summarizer_chain")
async def test_without_metadata_returns_raw_summary():
"""Summarizer returns the LLM output directly when no metadata is provided."""
result = await summarize_document("# Content", llm=MagicMock(model="gpt-4"))
assert result == "The summary."
@pytest.mark.usefixtures("patched_summarizer_chain")
async def test_with_metadata_includes_metadata_values_in_output():
"""Non-empty metadata values are prepended to the summary output."""
result = await summarize_document(
"# Content",
llm=MagicMock(model="gpt-4"),
metadata={"author": "Alice", "source": "Notion"},
)
assert "Alice" in result
assert "Notion" in result
@pytest.mark.usefixtures("patched_summarizer_chain")
async def test_with_metadata_omits_empty_fields_from_output():
"""Empty metadata fields are omitted from the summary output."""
result = await summarize_document(
"# Content",
llm=MagicMock(model="gpt-4"),
metadata={"author": "Alice", "description": ""},
)
assert "Alice" in result
assert "description" not in result.lower()