diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_calendar_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_calendar_pipeline.py new file mode 100644 index 000000000..6a60c5cc1 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_calendar_pipeline.py @@ -0,0 +1,111 @@ +"""Integration tests: Calendar indexer builds ConnectorDocuments that flow through the pipeline.""" + +import pytest +from sqlalchemy import select + +from app.config import config as app_config +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +_EMBEDDING_DIM = app_config.embedding_model_instance.dimension + +pytestmark = pytest.mark.integration + + +def _cal_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument: + return ConnectorDocument( + title=f"Event {unique_id}", + source_markdown=f"## Calendar Event\n\nDetails for {unique_id}", + unique_id=unique_id, + document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=True, + fallback_summary=f"Calendar: Event {unique_id}", + metadata={ + "event_id": unique_id, + "start_time": "2025-01-15T10:00:00", + "end_time": "2025-01-15T11:00:00", + "document_type": "Google Calendar Event", + }, + ) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_calendar_pipeline_creates_ready_document( + db_session, db_search_space, db_connector, db_user, mocker +): + """A Calendar ConnectorDocument flows through prepare + index to a READY document.""" + space_id = db_search_space.id + doc = _cal_doc( + unique_id="evt-1", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=str(db_user.id), + ) + + service = IndexingPipelineService(session=db_session) + prepared = await service.prepare_for_indexing([doc]) + assert len(prepared) == 1 + + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + row = result.scalars().first() + + assert row is not None + assert row.document_type == DocumentType.GOOGLE_CALENDAR_CONNECTOR + assert DocumentStatus.is_state(row.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_calendar_legacy_doc_migrated( + db_session, db_search_space, db_connector, db_user, mocker +): + """A legacy Composio Calendar doc is migrated and reused.""" + space_id = db_search_space.id + user_id = str(db_user.id) + evt_id = "evt-legacy-cal" + + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR.value, evt_id, space_id + ) + legacy_doc = Document( + title="Old Calendar Event", + document_type=DocumentType.COMPOSIO_GOOGLE_CALENDAR_CONNECTOR, + content="old summary", + content_hash=f"ch-{legacy_hash[:12]}", + unique_identifier_hash=legacy_hash, + source_markdown="## Old event", + search_space_id=space_id, + created_by_id=user_id, + embedding=[0.1] * _EMBEDDING_DIM, + status={"state": "ready"}, + ) + db_session.add(legacy_doc) + await db_session.flush() + original_id = legacy_doc.id + + connector_doc = _cal_doc( + unique_id=evt_id, + search_space_id=space_id, + connector_id=db_connector.id, + user_id=user_id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) + + result = await db_session.execute(select(Document).filter(Document.id == original_id)) + row = result.scalars().first() + + assert row.document_type == DocumentType.GOOGLE_CALENDAR_CONNECTOR + native_hash = compute_identifier_hash( + DocumentType.GOOGLE_CALENDAR_CONNECTOR.value, evt_id, space_id + ) + assert row.unique_identifier_hash == native_hash diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py new file mode 100644 index 000000000..32af0b8c1 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_drive_pipeline.py @@ -0,0 +1,110 @@ +"""Integration tests: Drive indexer builds ConnectorDocuments that flow through the pipeline.""" + +import pytest +from sqlalchemy import select + +from app.config import config as app_config +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +_EMBEDDING_DIM = app_config.embedding_model_instance.dimension + +pytestmark = pytest.mark.integration + + +def _drive_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument: + return ConnectorDocument( + title=f"File {unique_id}.pdf", + source_markdown=f"## Document Content\n\nText from file {unique_id}", + unique_id=unique_id, + document_type=DocumentType.GOOGLE_DRIVE_FILE, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=True, + fallback_summary=f"File: {unique_id}.pdf", + metadata={ + "google_drive_file_id": unique_id, + "google_drive_file_name": f"{unique_id}.pdf", + "document_type": "Google Drive File", + }, + ) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_drive_pipeline_creates_ready_document( + db_session, db_search_space, db_connector, db_user, mocker +): + """A Drive ConnectorDocument flows through prepare + index to a READY document.""" + space_id = db_search_space.id + doc = _drive_doc( + unique_id="file-abc", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=str(db_user.id), + ) + + service = IndexingPipelineService(session=db_session) + prepared = await service.prepare_for_indexing([doc]) + assert len(prepared) == 1 + + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + row = result.scalars().first() + + assert row is not None + assert row.document_type == DocumentType.GOOGLE_DRIVE_FILE + assert DocumentStatus.is_state(row.status, DocumentStatus.READY) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_drive_legacy_doc_migrated( + db_session, db_search_space, db_connector, db_user, mocker +): + """A legacy Composio Drive doc is migrated and reused.""" + space_id = db_search_space.id + user_id = str(db_user.id) + file_id = "file-legacy-drive" + + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR.value, file_id, space_id + ) + legacy_doc = Document( + title="Old Drive File", + document_type=DocumentType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR, + content="old file summary", + content_hash=f"ch-{legacy_hash[:12]}", + unique_identifier_hash=legacy_hash, + source_markdown="## Old file content", + search_space_id=space_id, + created_by_id=user_id, + embedding=[0.1] * _EMBEDDING_DIM, + status={"state": "ready"}, + ) + db_session.add(legacy_doc) + await db_session.flush() + original_id = legacy_doc.id + + connector_doc = _drive_doc( + unique_id=file_id, + search_space_id=space_id, + connector_id=db_connector.id, + user_id=user_id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) + + result = await db_session.execute(select(Document).filter(Document.id == original_id)) + row = result.scalars().first() + + assert row.document_type == DocumentType.GOOGLE_DRIVE_FILE + native_hash = compute_identifier_hash( + DocumentType.GOOGLE_DRIVE_FILE.value, file_id, space_id + ) + assert row.unique_identifier_hash == native_hash diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_gmail_pipeline.py b/surfsense_backend/tests/integration/indexing_pipeline/test_gmail_pipeline.py new file mode 100644 index 000000000..d67420cb7 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_gmail_pipeline.py @@ -0,0 +1,116 @@ +"""Integration tests: Gmail indexer builds ConnectorDocuments that flow through the pipeline.""" + +import pytest +from sqlalchemy import select + +from app.config import config as app_config +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.connector_document import ConnectorDocument +from app.indexing_pipeline.document_hashing import ( + compute_identifier_hash, + compute_unique_identifier_hash, +) +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +_EMBEDDING_DIM = app_config.embedding_model_instance.dimension + +pytestmark = pytest.mark.integration + + +def _gmail_doc(*, unique_id: str, search_space_id: int, connector_id: int, user_id: str) -> ConnectorDocument: + """Build a Gmail-style ConnectorDocument like the real indexer does.""" + return ConnectorDocument( + title=f"Subject for {unique_id}", + source_markdown=f"## Email\n\nBody of {unique_id}", + unique_id=unique_id, + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + search_space_id=search_space_id, + connector_id=connector_id, + created_by_id=user_id, + should_summarize=True, + fallback_summary=f"Gmail: Subject for {unique_id}", + metadata={ + "message_id": unique_id, + "from": "sender@example.com", + "document_type": "Gmail Message", + }, + ) + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_gmail_pipeline_creates_ready_document( + db_session, db_search_space, db_connector, db_user, mocker +): + """A Gmail ConnectorDocument flows through prepare + index to a READY document.""" + space_id = db_search_space.id + doc = _gmail_doc( + unique_id="msg-pipeline-1", + search_space_id=space_id, + connector_id=db_connector.id, + user_id=str(db_user.id), + ) + + service = IndexingPipelineService(session=db_session) + prepared = await service.prepare_for_indexing([doc]) + assert len(prepared) == 1 + + await service.index(prepared[0], doc, llm=mocker.Mock()) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + row = result.scalars().first() + + assert row is not None + assert row.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR + assert DocumentStatus.is_state(row.status, DocumentStatus.READY) + assert row.source_markdown == doc.source_markdown + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_gmail_legacy_doc_migrated_then_reused( + db_session, db_search_space, db_connector, db_user, mocker +): + """A legacy Composio Gmail doc is migrated then reused by the pipeline.""" + space_id = db_search_space.id + user_id = str(db_user.id) + msg_id = "msg-legacy-gmail" + + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GMAIL_CONNECTOR.value, msg_id, space_id + ) + legacy_doc = Document( + title="Old Gmail", + document_type=DocumentType.COMPOSIO_GMAIL_CONNECTOR, + content="old summary", + content_hash=f"ch-{legacy_hash[:12]}", + unique_identifier_hash=legacy_hash, + source_markdown="## Old content", + search_space_id=space_id, + created_by_id=user_id, + embedding=[0.1] * _EMBEDDING_DIM, + status={"state": "ready"}, + ) + db_session.add(legacy_doc) + await db_session.flush() + original_id = legacy_doc.id + + connector_doc = _gmail_doc( + unique_id=msg_id, + search_space_id=space_id, + connector_id=db_connector.id, + user_id=user_id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) + + prepared = await service.prepare_for_indexing([connector_doc]) + assert len(prepared) == 1 + assert prepared[0].id == original_id + assert prepared[0].document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR + + native_hash = compute_identifier_hash( + DocumentType.GOOGLE_GMAIL_CONNECTOR.value, msg_id, space_id + ) + assert prepared[0].unique_identifier_hash == native_hash diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_index_batch.py b/surfsense_backend/tests/integration/indexing_pipeline/test_index_batch.py new file mode 100644 index 000000000..a40498769 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_index_batch.py @@ -0,0 +1,55 @@ +"""Integration tests for IndexingPipelineService.index_batch().""" + +import pytest +from sqlalchemy import select + +from app.db import Document, DocumentStatus, DocumentType +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +pytestmark = pytest.mark.integration + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_index_batch_creates_ready_documents( + db_session, db_search_space, make_connector_document, mocker +): + """index_batch prepares and indexes a batch, resulting in READY documents.""" + space_id = db_search_space.id + docs = [ + make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-batch-1", + search_space_id=space_id, + source_markdown="## Email 1\n\nBody", + ), + make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-batch-2", + search_space_id=space_id, + source_markdown="## Email 2\n\nDifferent body", + ), + ] + + service = IndexingPipelineService(session=db_session) + results = await service.index_batch(docs, llm=mocker.Mock()) + + assert len(results) == 2 + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == space_id) + ) + rows = result.scalars().all() + assert len(rows) == 2 + + for row in rows: + assert DocumentStatus.is_state(row.status, DocumentStatus.READY) + assert row.content is not None + assert row.embedding is not None + + +@pytest.mark.usefixtures("patched_summarize", "patched_embed_texts", "patched_chunk_text") +async def test_index_batch_empty_returns_empty(db_session, mocker): + """index_batch with empty input returns an empty list.""" + service = IndexingPipelineService(session=db_session) + results = await service.index_batch([], llm=mocker.Mock()) + assert results == [] diff --git a/surfsense_backend/tests/integration/indexing_pipeline/test_migrate_legacy_docs.py b/surfsense_backend/tests/integration/indexing_pipeline/test_migrate_legacy_docs.py new file mode 100644 index 000000000..8fc0e7586 --- /dev/null +++ b/surfsense_backend/tests/integration/indexing_pipeline/test_migrate_legacy_docs.py @@ -0,0 +1,92 @@ +"""Integration tests for IndexingPipelineService.migrate_legacy_docs().""" + +import pytest +from sqlalchemy import select + +from app.config import config as app_config +from app.db import Document, DocumentType +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +_EMBEDDING_DIM = app_config.embedding_model_instance.dimension + +pytestmark = pytest.mark.integration + + +async def test_legacy_composio_gmail_doc_migrated_in_db( + db_session, db_search_space, db_user, make_connector_document +): + """A Composio Gmail doc in the DB gets its hash and type updated to native.""" + space_id = db_search_space.id + user_id = str(db_user.id) + unique_id = "msg-legacy-123" + + legacy_hash = compute_identifier_hash( + DocumentType.COMPOSIO_GMAIL_CONNECTOR.value, unique_id, space_id + ) + native_hash = compute_identifier_hash( + DocumentType.GOOGLE_GMAIL_CONNECTOR.value, unique_id, space_id + ) + + legacy_doc = Document( + title="Old Gmail", + document_type=DocumentType.COMPOSIO_GMAIL_CONNECTOR, + content="legacy content", + content_hash=f"ch-{legacy_hash[:12]}", + unique_identifier_hash=legacy_hash, + search_space_id=space_id, + created_by_id=user_id, + embedding=[0.1] * _EMBEDDING_DIM, + status={"state": "ready"}, + ) + db_session.add(legacy_doc) + await db_session.flush() + doc_id = legacy_doc.id + + connector_doc = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id=unique_id, + search_space_id=space_id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) + + result = await db_session.execute(select(Document).filter(Document.id == doc_id)) + reloaded = result.scalars().first() + + assert reloaded.unique_identifier_hash == native_hash + assert reloaded.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR + + +async def test_no_legacy_doc_is_noop( + db_session, db_search_space, make_connector_document +): + """When no legacy document exists, migrate_legacy_docs does nothing.""" + connector_doc = make_connector_document( + document_type=DocumentType.GOOGLE_CALENDAR_CONNECTOR, + unique_id="evt-no-legacy", + search_space_id=db_search_space.id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) + + result = await db_session.execute( + select(Document).filter(Document.search_space_id == db_search_space.id) + ) + assert result.scalars().all() == [] + + +async def test_non_google_type_is_skipped( + db_session, db_search_space, make_connector_document +): + """migrate_legacy_docs skips ConnectorDocuments that are not Google types.""" + connector_doc = make_connector_document( + document_type=DocumentType.CLICKUP_CONNECTOR, + unique_id="task-1", + search_space_id=db_search_space.id, + ) + + service = IndexingPipelineService(session=db_session) + await service.migrate_legacy_docs([connector_doc]) diff --git a/surfsense_backend/tests/unit/indexing_pipeline/test_index_batch.py b/surfsense_backend/tests/unit/indexing_pipeline/test_index_batch.py new file mode 100644 index 000000000..dcd097d20 --- /dev/null +++ b/surfsense_backend/tests/unit/indexing_pipeline/test_index_batch.py @@ -0,0 +1,82 @@ +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.db import Document, DocumentType +from app.indexing_pipeline.document_hashing import compute_unique_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def mock_session(): + return AsyncMock() + + +@pytest.fixture +def pipeline(mock_session): + return IndexingPipelineService(mock_session) + + +async def test_calls_prepare_then_index_per_document( + pipeline, make_connector_document +): + """index_batch calls prepare_for_indexing, then index() for each returned doc.""" + doc1 = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-1", + search_space_id=1, + ) + doc2 = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-2", + search_space_id=1, + ) + + orm1 = MagicMock(spec=Document) + orm1.unique_identifier_hash = compute_unique_identifier_hash(doc1) + orm2 = MagicMock(spec=Document) + orm2.unique_identifier_hash = compute_unique_identifier_hash(doc2) + + mock_llm = MagicMock() + + pipeline.prepare_for_indexing = AsyncMock(return_value=[orm1, orm2]) + pipeline.index = AsyncMock(side_effect=lambda doc, cdoc, llm: doc) + + results = await pipeline.index_batch([doc1, doc2], mock_llm) + + pipeline.prepare_for_indexing.assert_awaited_once_with([doc1, doc2]) + assert pipeline.index.await_count == 2 + assert results == [orm1, orm2] + + +async def test_empty_input_returns_empty(pipeline): + """Empty connector_docs list returns empty result.""" + pipeline.prepare_for_indexing = AsyncMock(return_value=[]) + + results = await pipeline.index_batch([], MagicMock()) + + assert results == [] + + +async def test_skips_document_without_matching_connector_doc( + pipeline, make_connector_document +): + """If prepare returns a doc whose hash has no matching ConnectorDocument, it's skipped.""" + doc1 = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-1", + search_space_id=1, + ) + + orphan_orm = MagicMock(spec=Document) + orphan_orm.unique_identifier_hash = "nonexistent-hash" + + pipeline.prepare_for_indexing = AsyncMock(return_value=[orphan_orm]) + pipeline.index = AsyncMock() + + results = await pipeline.index_batch([doc1], MagicMock()) + + pipeline.index.assert_not_awaited() + assert results == [] diff --git a/surfsense_backend/tests/unit/indexing_pipeline/test_migrate_legacy_docs.py b/surfsense_backend/tests/unit/indexing_pipeline/test_migrate_legacy_docs.py new file mode 100644 index 000000000..9334fe678 --- /dev/null +++ b/surfsense_backend/tests/unit/indexing_pipeline/test_migrate_legacy_docs.py @@ -0,0 +1,127 @@ +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from app.db import Document, DocumentType +from app.indexing_pipeline.document_hashing import compute_identifier_hash +from app.indexing_pipeline.indexing_pipeline_service import IndexingPipelineService + +pytestmark = pytest.mark.unit + + +@pytest.fixture +def mock_session(): + session = AsyncMock() + return session + + +@pytest.fixture +def pipeline(mock_session): + return IndexingPipelineService(mock_session) + + +def _make_execute_side_effect(doc_by_hash: dict): + """Return a side_effect for session.execute that resolves documents by hash.""" + + async def _side_effect(stmt): + result = MagicMock() + for h, doc in doc_by_hash.items(): + if h in str(stmt.compile(compile_kwargs={"literal_binds": True})): + result.scalars.return_value.first.return_value = doc + return result + result.scalars.return_value.first.return_value = None + return result + + return _side_effect + + +async def test_updates_hash_and_type_for_legacy_document( + pipeline, mock_session, make_connector_document +): + """Legacy Composio document gets unique_identifier_hash and document_type updated.""" + doc = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-abc", + search_space_id=1, + ) + + legacy_hash = compute_identifier_hash("COMPOSIO_GMAIL_CONNECTOR", "msg-abc", 1) + native_hash = compute_identifier_hash("GOOGLE_GMAIL_CONNECTOR", "msg-abc", 1) + + existing = MagicMock(spec=Document) + existing.unique_identifier_hash = legacy_hash + existing.document_type = DocumentType.COMPOSIO_GMAIL_CONNECTOR + + result_mock = MagicMock() + result_mock.scalars.return_value.first.return_value = existing + mock_session.execute = AsyncMock(return_value=result_mock) + + await pipeline.migrate_legacy_docs([doc]) + + assert existing.unique_identifier_hash == native_hash + assert existing.document_type == DocumentType.GOOGLE_GMAIL_CONNECTOR + mock_session.commit.assert_awaited_once() + + +async def test_noop_when_no_legacy_document_exists( + pipeline, mock_session, make_connector_document +): + """No updates when no legacy Composio document is found in DB.""" + doc = make_connector_document( + document_type=DocumentType.GOOGLE_GMAIL_CONNECTOR, + unique_id="msg-xyz", + search_space_id=1, + ) + + result_mock = MagicMock() + result_mock.scalars.return_value.first.return_value = None + mock_session.execute = AsyncMock(return_value=result_mock) + + await pipeline.migrate_legacy_docs([doc]) + + mock_session.commit.assert_awaited_once() + + +async def test_skips_non_google_doc_types( + pipeline, mock_session, make_connector_document +): + """Non-Google doc types have no legacy mapping and trigger no DB query.""" + doc = make_connector_document( + document_type=DocumentType.SLACK_CONNECTOR, + unique_id="slack-123", + search_space_id=1, + ) + + await pipeline.migrate_legacy_docs([doc]) + + mock_session.execute.assert_not_awaited() + mock_session.commit.assert_awaited_once() + + +async def test_handles_all_three_google_types( + pipeline, mock_session, make_connector_document +): + """Each native Google type correctly maps to its Composio legacy type.""" + mappings = [ + (DocumentType.GOOGLE_GMAIL_CONNECTOR, "COMPOSIO_GMAIL_CONNECTOR"), + (DocumentType.GOOGLE_CALENDAR_CONNECTOR, "COMPOSIO_GOOGLE_CALENDAR_CONNECTOR"), + (DocumentType.GOOGLE_DRIVE_FILE, "COMPOSIO_GOOGLE_DRIVE_CONNECTOR"), + ] + for native_type, expected_legacy in mappings: + doc = make_connector_document( + document_type=native_type, + unique_id="id-1", + search_space_id=1, + ) + + existing = MagicMock(spec=Document) + existing.document_type = DocumentType(expected_legacy) + + result_mock = MagicMock() + result_mock.scalars.return_value.first.return_value = existing + mock_session.execute = AsyncMock(return_value=result_mock) + mock_session.commit = AsyncMock() + + await pipeline.migrate_legacy_docs([doc]) + + assert existing.document_type == native_type