mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-04-26 17:26:23 +02:00
feat: add integration tests for indexing pipeline components
- Introduced integration tests for Calendar, Drive, and Gmail indexers to ensure proper document creation and migration. - Added tests for batch indexing functionality to validate the processing of multiple documents. - Implemented tests for legacy document migration to verify updates to document types and hashes. - Enhanced test coverage for the IndexingPipelineService to ensure robust functionality across various document types.
This commit is contained in:
parent
f7b52470eb
commit
8c41fd91ba
7 changed files with 693 additions and 0 deletions
|
|
@ -0,0 +1,111 @@
|
|||
"""Integration tests: Calendar indexer builds ConnectorDocuments that flow through the pipeline."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _cal_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument:
|
||||
return ConnectorDocument(
|
||||
title=f"Event {unique_id}",
|
||||
source_markdown=f"## Calendar Event\n\nDetails for {unique_id}",
|
||||
unique_id=unique_id,
|
||||
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
created_by_id=user_id,
|
||||
should_summarize=True,
|
||||
fallback_summary=f"Calendar: Event {unique_id}",
|
||||
metadata={
|
||||
"event_id": unique_id,
|
||||
"start_time": "2025-01-15T10:00:00",
|
||||
"end_time": "2025-01-15T11:00:00",
|
||||
"document_type": "Google Calendar Event",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_calendar_pipeline_creates_ready_document(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A Calendar ConnectorDocument flows through prepare + index to a READY document."""
|
||||
space_id = db_search_space.id
|
||||
doc = _cal_doc(
|
||||
unique_id="evt-1",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=str(db_user.id),
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
prepared = await service.prepare_for_indexing([doc])
|
||||
assert len(prepared) == 1
|
||||
|
||||
await service.index(prepared[0], doc, llm=mocker.Mock())
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row is not None
|
||||
assert row.document_type == DocumentType.GOOGLE_CALENDAR_CONNECTOR
|
||||
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_calendar_legacy_doc_migrated(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A legacy Composio Calendar doc is migrated and reused."""
|
||||
space_id = db_search_space.id
|
||||
user_id = str(db_user.id)
|
||||
evt_id = "evt-legacy-cal"
|
||||
|
||||
legacy_hash = compute_identifier_hash(
|
||||
DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR.value, evt_id, space_id
|
||||
)
|
||||
legacy_doc = Document(
|
||||
title="Old Calendar Event",
|
||||
document_type=DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR,
|
||||
content="old summary",
|
||||
content_hash=f"ch-{legacy_hash[:12]}",
|
||||
unique_identifier_hash=legacy_hash,
|
||||
source_markdown="## Old event",
|
||||
search_space_id=space_id,
|
||||
created_by_id=user_id,
|
||||
embedding=[0.1] * _EMBEDDING_DIM,
|
||||
status={"state": "ready"},
|
||||
)
|
||||
db_session.add(legacy_doc)
|
||||
await db_session.flush()
|
||||
original_id = legacy_doc.id
|
||||
|
||||
connector_doc = _cal_doc(
|
||||
unique_id=evt_id,
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
|
||||
result = await db_session.execute(select(Document).filter(Document.id == original_id))
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row.document_type == DocumentType.GOOGLE_CALENDAR_CONNECTOR
|
||||
native_hash = compute_identifier_hash(
|
||||
DocumentType.GOOGLE_CALENDAR_CONNECTOR.value, evt_id, space_id
|
||||
)
|
||||
assert row.unique_identifier_hash == native_hash
|
||||
|
|
@ -0,0 +1,110 @@
|
|||
"""Integration tests: Drive indexer builds ConnectorDocuments that flow through the pipeline."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _drive_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument:
|
||||
return ConnectorDocument(
|
||||
title=f"File {unique_id}.pdf",
|
||||
source_markdown=f"## Document Content\n\nText from file {unique_id}",
|
||||
unique_id=unique_id,
|
||||
document_type=DocumentType.GOOGLE_DRIVE_FILE,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
created_by_id=user_id,
|
||||
should_summarize=True,
|
||||
fallback_summary=f"File: {unique_id}.pdf",
|
||||
metadata={
|
||||
"google_drive_file_id": unique_id,
|
||||
"google_drive_file_name": f"{unique_id}.pdf",
|
||||
"document_type": "Google Drive File",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_drive_pipeline_creates_ready_document(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A Drive ConnectorDocument flows through prepare + index to a READY document."""
|
||||
space_id = db_search_space.id
|
||||
doc = _drive_doc(
|
||||
unique_id="file-abc",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=str(db_user.id),
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
prepared = await service.prepare_for_indexing([doc])
|
||||
assert len(prepared) == 1
|
||||
|
||||
await service.index(prepared[0], doc, llm=mocker.Mock())
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row is not None
|
||||
assert row.document_type == DocumentType.GOOGLE_DRIVE_FILE
|
||||
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_drive_legacy_doc_migrated(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A legacy Composio Drive doc is migrated and reused."""
|
||||
space_id = db_search_space.id
|
||||
user_id = str(db_user.id)
|
||||
file_id = "file-legacy-drive"
|
||||
|
||||
legacy_hash = compute_identifier_hash(
|
||||
DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR.value, file_id, space_id
|
||||
)
|
||||
legacy_doc = Document(
|
||||
title="Old Drive File",
|
||||
document_type=DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
|
||||
content="old file summary",
|
||||
content_hash=f"ch-{legacy_hash[:12]}",
|
||||
unique_identifier_hash=legacy_hash,
|
||||
source_markdown="## Old file content",
|
||||
search_space_id=space_id,
|
||||
created_by_id=user_id,
|
||||
embedding=[0.1] * _EMBEDDING_DIM,
|
||||
status={"state": "ready"},
|
||||
)
|
||||
db_session.add(legacy_doc)
|
||||
await db_session.flush()
|
||||
original_id = legacy_doc.id
|
||||
|
||||
connector_doc = _drive_doc(
|
||||
unique_id=file_id,
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
|
||||
result = await db_session.execute(select(Document).filter(Document.id == original_id))
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row.document_type == DocumentType.GOOGLE_DRIVE_FILE
|
||||
native_hash = compute_identifier_hash(
|
||||
DocumentType.GOOGLE_DRIVE_FILE.value, file_id, space_id
|
||||
)
|
||||
assert row.unique_identifier_hash == native_hash
|
||||
|
|
@ -0,0 +1,116 @@
|
|||
"""Integration tests: Gmail indexer builds ConnectorDocuments that flow through the pipeline."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.connector_document import ConnectorDocument
|
||||
from app.indexing_pipeline.document_hashing import (
|
||||
compute_identifier_hash,
|
||||
compute_unique_identifier_hash,
|
||||
)
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
def _gmail_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument:
|
||||
"""Build a Gmail-style ConnectorDocument like the real indexer does."""
|
||||
return ConnectorDocument(
|
||||
title=f"Subject for {unique_id}",
|
||||
source_markdown=f"## Email\n\nBody of {unique_id}",
|
||||
unique_id=unique_id,
|
||||
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
|
||||
search_space_id=search_space_id,
|
||||
connector_id=connector_id,
|
||||
created_by_id=user_id,
|
||||
should_summarize=True,
|
||||
fallback_summary=f"Gmail: Subject for {unique_id}",
|
||||
metadata={
|
||||
"message_id": unique_id,
|
||||
"from": "sender@example.com",
|
||||
"document_type": "Gmail Message",
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_gmail_pipeline_creates_ready_document(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A Gmail ConnectorDocument flows through prepare + index to a READY document."""
|
||||
space_id = db_search_space.id
|
||||
doc = _gmail_doc(
|
||||
unique_id="msg-pipeline-1",
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=str(db_user.id),
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
prepared = await service.prepare_for_indexing([doc])
|
||||
assert len(prepared) == 1
|
||||
|
||||
await service.index(prepared[0], doc, llm=mocker.Mock())
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
row = result.scalars().first()
|
||||
|
||||
assert row is not None
|
||||
assert row.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR
|
||||
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
|
||||
assert row.source_markdown == doc.source_markdown
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_gmail_legacy_doc_migrated_then_reused(
|
||||
db_session, db_search_space, db_connector, db_user, mocker
|
||||
):
|
||||
"""A legacy Composio Gmail doc is migrated then reused by the pipeline."""
|
||||
space_id = db_search_space.id
|
||||
user_id = str(db_user.id)
|
||||
msg_id = "msg-legacy-gmail"
|
||||
|
||||
legacy_hash = compute_identifier_hash(
|
||||
DocumentType.COMPOSIO_GMAIL_CONNECTOR.value, msg_id, space_id
|
||||
)
|
||||
legacy_doc = Document(
|
||||
title="Old Gmail",
|
||||
document_type=DocumentType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
content="old summary",
|
||||
content_hash=f"ch-{legacy_hash[:12]}",
|
||||
unique_identifier_hash=legacy_hash,
|
||||
source_markdown="## Old content",
|
||||
search_space_id=space_id,
|
||||
created_by_id=user_id,
|
||||
embedding=[0.1] * _EMBEDDING_DIM,
|
||||
status={"state": "ready"},
|
||||
)
|
||||
db_session.add(legacy_doc)
|
||||
await db_session.flush()
|
||||
original_id = legacy_doc.id
|
||||
|
||||
connector_doc = _gmail_doc(
|
||||
unique_id=msg_id,
|
||||
search_space_id=space_id,
|
||||
connector_id=db_connector.id,
|
||||
user_id=user_id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
|
||||
prepared = await service.prepare_for_indexing([connector_doc])
|
||||
assert len(prepared) == 1
|
||||
assert prepared[0].id == original_id
|
||||
assert prepared[0].document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR
|
||||
|
||||
native_hash = compute_identifier_hash(
|
||||
DocumentType.GOOGLE_GMAIL_CONNECTOR.value, msg_id, space_id
|
||||
)
|
||||
assert prepared[0].unique_identifier_hash == native_hash
|
||||
|
|
@ -0,0 +1,55 @@
|
|||
"""Integration tests for IndexingPipelineService.index_batch()."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.db import Document, DocumentStatus, DocumentType
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_index_batch_creates_ready_documents(
|
||||
db_session, db_search_space, make_connector_document, mocker
|
||||
):
|
||||
"""index_batch prepares and indexes a batch, resulting in READY documents."""
|
||||
space_id = db_search_space.id
|
||||
docs = [
|
||||
make_connector_document(
|
||||
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
|
||||
unique_id="msg-batch-1",
|
||||
search_space_id=space_id,
|
||||
source_markdown="## Email 1\n\nBody",
|
||||
),
|
||||
make_connector_document(
|
||||
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
|
||||
unique_id="msg-batch-2",
|
||||
search_space_id=space_id,
|
||||
source_markdown="## Email 2\n\nDifferent body",
|
||||
),
|
||||
]
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
results = await service.index_batch(docs, llm=mocker.Mock())
|
||||
|
||||
assert len(results) == 2
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == space_id)
|
||||
)
|
||||
rows = result.scalars().all()
|
||||
assert len(rows) == 2
|
||||
|
||||
for row in rows:
|
||||
assert DocumentStatus.is_state(row.status, DocumentStatus.READY)
|
||||
assert row.content is not None
|
||||
assert row.embedding is not None
|
||||
|
||||
|
||||
@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text")
|
||||
async def test_index_batch_empty_returns_empty(db_session, mocker):
|
||||
"""index_batch with empty input returns an empty list."""
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
results = await service.index_batch([], llm=mocker.Mock())
|
||||
assert results == []
|
||||
|
|
@ -0,0 +1,92 @@
|
|||
"""Integration tests for IndexingPipelineService.migrate_legacy_docs()."""
|
||||
|
||||
import pytest
|
||||
from sqlalchemy import select
|
||||
|
||||
from app.config import config as app_config
|
||||
from app.db import Document, DocumentType
|
||||
from app.indexing_pipeline.document_hashing import compute_identifier_hash
|
||||
from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService
|
||||
|
||||
_EMBEDDING_DIM = app_config.embedding_model_instance.dimension
|
||||
|
||||
pytestmark = pytest.mark.integration
|
||||
|
||||
|
||||
async def test_legacy_composio_gmail_doc_migrated_in_db(
|
||||
db_session, db_search_space, db_user, make_connector_document
|
||||
):
|
||||
"""A Composio Gmail doc in the DB gets its hash and type updated to native."""
|
||||
space_id = db_search_space.id
|
||||
user_id = str(db_user.id)
|
||||
unique_id = "msg-legacy-123"
|
||||
|
||||
legacy_hash = compute_identifier_hash(
|
||||
DocumentType.COMPOSIO_GMAIL_CONNECTOR.value, unique_id, space_id
|
||||
)
|
||||
native_hash = compute_identifier_hash(
|
||||
DocumentType.GOOGLE_GMAIL_CONNECTOR.value, unique_id, space_id
|
||||
)
|
||||
|
||||
legacy_doc = Document(
|
||||
title="Old Gmail",
|
||||
document_type=DocumentType.COMPOSIO_GMAIL_CONNECTOR,
|
||||
content="legacy content",
|
||||
content_hash=f"ch-{legacy_hash[:12]}",
|
||||
unique_identifier_hash=legacy_hash,
|
||||
search_space_id=space_id,
|
||||
created_by_id=user_id,
|
||||
embedding=[0.1] * _EMBEDDING_DIM,
|
||||
status={"state": "ready"},
|
||||
)
|
||||
db_session.add(legacy_doc)
|
||||
await db_session.flush()
|
||||
doc_id = legacy_doc.id
|
||||
|
||||
connector_doc = make_connector_document(
|
||||
document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR,
|
||||
unique_id=unique_id,
|
||||
search_space_id=space_id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
|
||||
result = await db_session.execute(select(Document).filter(Document.id == doc_id))
|
||||
reloaded = result.scalars().first()
|
||||
|
||||
assert reloaded.unique_identifier_hash == native_hash
|
||||
assert reloaded.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR
|
||||
|
||||
|
||||
async def test_no_legacy_doc_is_noop(
|
||||
db_session, db_search_space, make_connector_document
|
||||
):
|
||||
"""When no legacy document exists, migrate_legacy_docs does nothing."""
|
||||
connector_doc = make_connector_document(
|
||||
document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR,
|
||||
unique_id="evt-no-legacy",
|
||||
search_space_id=db_search_space.id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
|
||||
result = await db_session.execute(
|
||||
select(Document).filter(Document.search_space_id == db_search_space.id)
|
||||
)
|
||||
assert result.scalars().all() == []
|
||||
|
||||
|
||||
async def test_non_google_type_is_skipped(
|
||||
db_session, db_search_space, make_connector_document
|
||||
):
|
||||
"""migrate_legacy_docs skips ConnectorDocuments that are not Google types."""
|
||||
connector_doc = make_connector_document(
|
||||
document_type=DocumentType.CLICKUP_CONNECTOR,
|
||||
unique_id="task-1",
|
||||
search_space_id=db_search_space.id,
|
||||
)
|
||||
|
||||
service = IndexingPipelineService(session=db_session)
|
||||
await service.migrate_legacy_docs([connector_doc])
|
||||
Loading…
Add table
Add a link
Reference in a new issue