test(backend): add Composio route integration tests

This commit is contained in:
Anish Sarkar 2026-05-06 17:19:32 +05:30
parent d4f806f134
commit 87dd5af259
4 changed files with 303 additions and 0 deletions

View file

@ -0,0 +1 @@
"""Integration tests for Composio connector routes."""

View file

@ -0,0 +1,90 @@
"""Composio route integration fixtures.
The sys.modules hijack happens at module import time, before importing
app.app, so production `from composio import Composio` bindings resolve to
the strict E2E fake in this pytest process too.
"""
from __future__ import annotations
import sys
from collections.abc import AsyncGenerator
import httpx
import pytest
import pytest_asyncio
from httpx import ASGITransport
from sqlalchemy.ext.asyncio import AsyncSession
from tests.e2e.fakes import composio_module as _fake_composio
sys.modules["composio"] = _fake_composio
from app.app import app, limiter # noqa: E402
from app.config import config # noqa: E402
from app.db import ( # noqa: E402
SearchSourceConnector,
SearchSourceConnectorType,
User,
get_async_session,
)
from app.users import current_active_user # noqa: E402
pytestmark = pytest.mark.integration
limiter.enabled = False
config.COMPOSIO_ENABLED = True
config.COMPOSIO_API_KEY = "e2e-integration-composio-sentinel"
config.NEXT_FRONTEND_URL = "http://localhost:3000"
@pytest_asyncio.fixture
async def client(
db_session: AsyncSession,
db_user: User,
) -> AsyncGenerator[httpx.AsyncClient, None]:
async def override_session() -> AsyncGenerator[AsyncSession, None]:
yield db_session
async def override_user() -> User:
return db_user
previous_overrides = app.dependency_overrides.copy()
app.dependency_overrides[get_async_session] = override_session
app.dependency_overrides[current_active_user] = override_user
try:
async with httpx.AsyncClient(
transport=ASGITransport(app=app),
base_url="http://test",
timeout=30.0,
follow_redirects=False,
) as test_client:
yield test_client
finally:
app.dependency_overrides.clear()
app.dependency_overrides.update(previous_overrides)
@pytest_asyncio.fixture
async def drive_connector(
db_session: AsyncSession,
db_user: User,
db_search_space,
) -> SearchSourceConnector:
connector = SearchSourceConnector(
name="Google Drive (Composio) - e2e-fake@surfsense.example",
connector_type=SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
is_indexable=True,
config={
"composio_connected_account_id": "fake-acct-googledrive-existing",
"toolkit_id": "googledrive",
"toolkit_name": "Google Drive",
"is_indexable": True,
},
search_space_id=db_search_space.id,
user_id=db_user.id,
)
db_session.add(connector)
await db_session.flush()
return connector

View file

@ -0,0 +1,99 @@
from typing import Any
import httpx
import pytest
from sqlalchemy.ext.asyncio import AsyncSession
from app.db import SearchSourceConnector
from tests.e2e.fakes import composio_module
pytestmark = pytest.mark.integration
async def test_root_listing_returns_canned_items(
client: httpx.AsyncClient,
drive_connector: SearchSourceConnector,
):
response = await client.get(
f"/api/v1/connectors/{drive_connector.id}/composio-drive/folders"
)
assert response.status_code == 200
items = response.json()["items"]
names = {item["name"] for item in items}
assert "Projects" in names
assert "e2e-canary.txt" in names
assert any(
item["id"] == "fake-folder-projects" and item["isFolder"] is True
for item in items
)
async def test_save_round_trips_selected_files(
client: httpx.AsyncClient,
db_session: AsyncSession,
drive_connector: SearchSourceConnector,
):
selected_files = [
{
"id": "fake-file-canary",
"name": "e2e-canary.txt",
"mimeType": "text/plain",
}
]
response = await client.put(
f"/api/v1/search-source-connectors/{drive_connector.id}",
json={
"config": {
"selected_folders": [],
"selected_files": selected_files,
"indexing_options": {
"max_files_per_folder": 10,
"incremental_sync": False,
"include_subfolders": False,
},
}
},
)
assert response.status_code == 200
await db_session.refresh(drive_connector)
assert drive_connector.config["selected_files"] == selected_files
assert drive_connector.config["selected_folders"] == []
async def test_auth_expired_error_classifies_and_flags_connector(
monkeypatch: pytest.MonkeyPatch,
client: httpx.AsyncClient,
db_session: AsyncSession,
drive_connector: SearchSourceConnector,
):
def raise_auth_expired(
self: Any,
*,
slug: str,
connected_account_id: str,
user_id: str | None = None,
arguments: dict[str, Any] | None = None,
dangerously_skip_version_check: bool = True,
**kwargs: Any,
) -> dict[str, Any]:
raise RuntimeError(
"Token has been expired or revoked. (HTTP 401: invalid_grant)"
)
monkeypatch.setattr(composio_module._Tools, "execute", raise_auth_expired)
response = await client.get(
f"/api/v1/connectors/{drive_connector.id}/composio-drive/folders"
)
assert response.status_code == 400
body = response.text.lower()
assert "authentication" in body
assert "expired" in body
await db_session.refresh(drive_connector)
assert drive_connector.config["auth_expired"] is True

View file

@ -0,0 +1,113 @@
from uuid import UUID
import httpx
import pytest
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.config import config
from app.db import (
SearchSourceConnector,
SearchSourceConnectorType,
SearchSpace,
User,
)
from app.utils.oauth_security import OAuthStateManager
pytestmark = pytest.mark.integration
def _state_for(space_id: int, user_id: UUID, toolkit_id: str = "googledrive") -> str:
return OAuthStateManager(config.SECRET_KEY).generate_secure_state(
space_id=space_id,
user_id=user_id,
toolkit_id=toolkit_id,
)
async def _drive_connectors(
session: AsyncSession,
*,
user_id: UUID,
search_space_id: int,
) -> list[SearchSourceConnector]:
result = await session.execute(
select(SearchSourceConnector).where(
SearchSourceConnector.user_id == user_id,
SearchSourceConnector.search_space_id == search_space_id,
SearchSourceConnector.connector_type
== SearchSourceConnectorType.COMPOSIO_GOOGLE_DRIVE_CONNECTOR,
)
)
return list(result.scalars().all())
async def test_callback_with_error_param_redirects_to_denied_page(
client: httpx.AsyncClient,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
state = _state_for(db_search_space.id, db_user.id)
response = await client.get(
f"/api/v1/auth/composio/connector/callback?state={state}&error=access_denied"
)
assert response.status_code in {302, 303, 307}
location = response.headers["location"]
assert (
f"/dashboard/{db_search_space.id}/connectors/callback?"
"error=composio_oauth_denied"
) in location
connectors = await _drive_connectors(
db_session,
user_id=db_user.id,
search_space_id=db_search_space.id,
)
assert connectors == []
async def test_second_oauth_for_same_toolkit_takes_reconnection_branch(
client: httpx.AsyncClient,
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
first_state = _state_for(db_search_space.id, db_user.id)
first_response = await client.get(
"/api/v1/auth/composio/connector/callback"
f"?state={first_state}&connectedAccountId=fake-acct-googledrive-first"
)
assert first_response.status_code in {302, 303, 307}
first_connectors = await _drive_connectors(
db_session,
user_id=db_user.id,
search_space_id=db_search_space.id,
)
assert len(first_connectors) == 1
first_connector = first_connectors[0]
assert first_connector.config["composio_connected_account_id"] == (
"fake-acct-googledrive-first"
)
second_state = _state_for(db_search_space.id, db_user.id)
second_response = await client.get(
"/api/v1/auth/composio/connector/callback"
f"?state={second_state}&connectedAccountId=fake-acct-googledrive-second"
)
assert second_response.status_code in {302, 303, 307}
second_connectors = await _drive_connectors(
db_session,
user_id=db_user.id,
search_space_id=db_search_space.id,
)
assert len(second_connectors) == 1
assert second_connectors[0].id == first_connector.id
assert second_connectors[0].config["composio_connected_account_id"] == (
"fake-acct-googledrive-second"
)