fix: plug all gaps found in deep review of indexing pipeline

This commit is contained in:
CREDO23 2026-02-25 02:20:44 +02:00
parent 46c7ccd70b
commit 5b616eac5a
7 changed files with 183 additions and 6 deletions

View file

@ -110,6 +110,28 @@ async def test_embedding_written_to_db(
assert len(reloaded.embedding) == 1024
@pytest.mark.usefixtures("patched_summarize", "patched_embed_text", "patched_chunk_text")
async def test_updated_at_advances_after_indexing(
db_session, db_search_space, make_connector_document,
):
connector_doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
prepared = await service.prepare_for_indexing([connector_doc])
document = prepared[0]
document_id = document.id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_pending = result.scalars().first().updated_at
await service.index(document, connector_doc, llm=None)
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_ready = result.scalars().first().updated_at
assert updated_at_ready > updated_at_pending
@pytest.mark.usefixtures("patched_summarize_raises", "patched_chunk_text")
async def test_llm_error_sets_status_failed(
db_session, db_search_space, make_connector_document,

View file

@ -115,6 +115,112 @@ async def test_duplicate_in_batch_is_persisted_once(
assert len(rows) == 1
async def test_created_by_id_is_persisted(
db_session, db_user, db_search_space, make_connector_document
):
doc = make_connector_document(
search_space_id=db_search_space.id,
created_by_id=str(db_user.id),
)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([doc])
document_id = results[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert str(reloaded.created_by_id) == str(db_user.id)
async def test_metadata_is_updated_when_content_changes(
db_session, db_search_space, make_connector_document
):
original = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v1",
metadata={"status": "in_progress"},
)
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
document_id = first[0].id
updated = make_connector_document(
search_space_id=db_search_space.id,
source_markdown="## v2",
metadata={"status": "done"},
)
await service.prepare_for_indexing([updated])
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.document_metadata == {"status": "done"}
async def test_updated_at_advances_when_content_changes(
db_session, db_search_space, make_connector_document
):
original = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v1")
service = IndexingPipelineService(session=db_session)
first = await service.prepare_for_indexing([original])
document_id = first[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v1 = result.scalars().first().updated_at
updated = make_connector_document(search_space_id=db_search_space.id, source_markdown="## v2")
await service.prepare_for_indexing([updated])
result = await db_session.execute(select(Document).filter(Document.id == document_id))
updated_at_v2 = result.scalars().first().updated_at
assert updated_at_v2 > updated_at_v1
async def test_updated_at_is_set_on_creation(
db_session, db_search_space, make_connector_document
):
doc = make_connector_document(search_space_id=db_search_space.id)
service = IndexingPipelineService(session=db_session)
results = await service.prepare_for_indexing([doc])
document_id = results[0].id
result = await db_session.execute(select(Document).filter(Document.id == document_id))
reloaded = result.scalars().first()
assert reloaded.updated_at is not None
async def test_same_content_from_different_source_is_skipped(
db_session, db_search_space, make_connector_document
):
first = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-a",
source_markdown="## Shared content",
)
second = make_connector_document(
search_space_id=db_search_space.id,
unique_id="source-b",
source_markdown="## Shared content",
)
service = IndexingPipelineService(session=db_session)
await service.prepare_for_indexing([first])
results = await service.prepare_for_indexing([second])
assert results == []
result = await db_session.execute(
select(Document).filter(Document.search_space_id == db_search_space.id)
)
assert len(result.scalars().all()) == 1
async def test_title_and_content_change_updates_both_and_returns_document(
db_session, db_search_space, make_connector_document
):