SurfSense/surfsense_backend/tests/utils/helpers.py
Anish Sarkar 4ff712578d refactor: Enhance test utilities for document upload by integrating search space handling
- Updated test fixtures to include search space ID retrieval for improved document upload tests.
- Refactored authentication and document upload functions to accept search space ID as a parameter.
- Removed hardcoded search space ID references to streamline test configurations.
2026-02-25 17:29:09 +05:30

186 lines
5.6 KiB
Python

"""Shared test helpers for authentication, polling, and cleanup."""
from __future__ import annotations
import asyncio
import os
from pathlib import Path
import httpx
FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures"
BACKEND_URL = os.environ.get("TEST_BACKEND_URL", "http://localhost:8000")
TEST_EMAIL = os.environ.get("TEST_USER_EMAIL", "testuser@surfsense.com")
TEST_PASSWORD = os.environ.get("TEST_USER_PASSWORD", "testpassword123")
async def get_auth_token(client: httpx.AsyncClient) -> str:
"""Log in and return a Bearer JWT token, registering the user first if needed."""
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if response.status_code == 200:
return response.json()["access_token"]
reg_response = await client.post(
"/auth/register",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert reg_response.status_code == 201, (
f"Registration failed ({reg_response.status_code}): {reg_response.text}"
)
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.status_code == 200, (
f"Login after registration failed ({response.status_code}): {response.text}"
)
return response.json()["access_token"]
async def get_search_space_id(client: httpx.AsyncClient, token: str) -> int:
"""Fetch the first search space owned by the test user."""
resp = await client.get(
"/api/v1/searchspaces",
headers=auth_headers(token),
)
assert resp.status_code == 200, (
f"Failed to list search spaces ({resp.status_code}): {resp.text}"
)
spaces = resp.json()
assert len(spaces) > 0, "No search spaces found for test user"
return spaces[0]["id"]
def auth_headers(token: str) -> dict[str, str]:
"""Return Authorization header dict for a Bearer token."""
return {"Authorization": f"Bearer {token}"}
async def upload_file(
client: httpx.AsyncClient,
headers: dict[str, str],
fixture_name: str,
*,
search_space_id: int,
filename_override: str | None = None,
) -> httpx.Response:
"""Upload a single fixture file and return the raw response."""
file_path = FIXTURES_DIR / fixture_name
upload_name = filename_override or fixture_name
with open(file_path, "rb") as f:
return await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files={"files": (upload_name, f)},
data={"search_space_id": str(search_space_id)},
)
async def upload_multiple_files(
client: httpx.AsyncClient,
headers: dict[str, str],
fixture_names: list[str],
*,
search_space_id: int,
) -> httpx.Response:
"""Upload multiple fixture files in a single request."""
files = []
open_handles = []
try:
for name in fixture_names:
fh = open(FIXTURES_DIR / name, "rb") # noqa: SIM115
open_handles.append(fh)
files.append(("files", (name, fh)))
return await client.post(
"/api/v1/documents/fileupload",
headers=headers,
files=files,
data={"search_space_id": str(search_space_id)},
)
finally:
for fh in open_handles:
fh.close()
async def poll_document_status(
client: httpx.AsyncClient,
headers: dict[str, str],
document_ids: list[int],
*,
search_space_id: int,
timeout: float = 180.0,
interval: float = 3.0,
) -> dict[int, dict]:
"""
Poll ``GET /api/v1/documents/status`` until every document reaches a
terminal state (``ready`` or ``failed``) or *timeout* seconds elapse.
Returns a mapping of ``{document_id: status_item_dict}``.
"""
ids_param = ",".join(str(d) for d in document_ids)
terminal_states = {"ready", "failed"}
elapsed = 0.0
while elapsed < timeout:
resp = await client.get(
"/api/v1/documents/status",
headers=headers,
params={
"search_space_id": search_space_id,
"document_ids": ids_param,
},
)
assert resp.status_code == 200, (
f"Status poll failed ({resp.status_code}): {resp.text}"
)
items = {item["id"]: item for item in resp.json()["items"]}
if all(
items.get(did, {}).get("status", {}).get("state") in terminal_states
for did in document_ids
):
return items
await asyncio.sleep(interval)
elapsed += interval
raise TimeoutError(
f"Documents {document_ids} did not reach terminal state within {timeout}s. "
f"Last status: {items}"
)
async def get_document(
client: httpx.AsyncClient,
headers: dict[str, str],
document_id: int,
) -> dict:
"""Fetch a single document by ID."""
resp = await client.get(
f"/api/v1/documents/{document_id}",
headers=headers,
)
assert resp.status_code == 200, (
f"GET document {document_id} failed ({resp.status_code}): {resp.text}"
)
return resp.json()
async def delete_document(
client: httpx.AsyncClient,
headers: dict[str, str],
document_id: int,
) -> httpx.Response:
"""Delete a document by ID, returning the raw response."""
return await client.delete(
f"/api/v1/documents/{document_id}",
headers=headers,
)