refactor: Enhance test utilities for document upload by integrating search space handling

- Updated test fixtures to include search space ID retrieval for improved document upload tests.
- Refactored authentication and document upload functions to accept search space ID as a parameter.
- Removed hardcoded search space ID references to streamline test configurations.
This commit is contained in:
Anish Sarkar 2026-02-25 17:29:09 +05:30
parent 41eb68663a
commit 4ff712578d
3 changed files with 97 additions and 48 deletions

View file

@ -13,22 +13,51 @@ FIXTURES_DIR = Path(__file__).resolve().parent.parent / "fixtures"
BACKEND_URL = os.environ.get("TEST_BACKEND_URL", "http://localhost:8000")
TEST_EMAIL = os.environ.get("TEST_USER_EMAIL", "testuser@surfsense.com")
TEST_PASSWORD = os.environ.get("TEST_USER_PASSWORD", "testpassword123")
TEST_SEARCH_SPACE_ID = int(os.environ.get("TEST_SEARCH_SPACE_ID", "1"))
async def get_auth_token(client: httpx.AsyncClient) -> str:
"""Log in and return a Bearer JWT token."""
"""Log in and return a Bearer JWT token, registering the user first if needed."""
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
if response.status_code == 200:
return response.json()["access_token"]
reg_response = await client.post(
"/auth/register",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert reg_response.status_code == 201, (
f"Registration failed ({reg_response.status_code}): {reg_response.text}"
)
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
)
assert response.status_code == 200, (
f"Login failed ({response.status_code}): {response.text}"
f"Login after registration failed ({response.status_code}): {response.text}"
)
return response.json()["access_token"]
async def get_search_space_id(client: httpx.AsyncClient, token: str) -> int:
"""Fetch the first search space owned by the test user."""
resp = await client.get(
"/api/v1/searchspaces",
headers=auth_headers(token),
)
assert resp.status_code == 200, (
f"Failed to list search spaces ({resp.status_code}): {resp.text}"
)
spaces = resp.json()
assert len(spaces) > 0, "No search spaces found for test user"
return spaces[0]["id"]
def auth_headers(token: str) -> dict[str, str]:
"""Return Authorization header dict for a Bearer token."""
return {"Authorization": f"Bearer {token}"}
@ -39,7 +68,7 @@ async def upload_file(
headers: dict[str, str],
fixture_name: str,
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
search_space_id: int,
filename_override: str | None = None,
) -> httpx.Response:
"""Upload a single fixture file and return the raw response."""
@ -59,7 +88,7 @@ async def upload_multiple_files(
headers: dict[str, str],
fixture_names: list[str],
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
search_space_id: int,
) -> httpx.Response:
"""Upload multiple fixture files in a single request."""
files = []
@ -86,7 +115,7 @@ async def poll_document_status(
headers: dict[str, str],
document_ids: list[int],
*,
search_space_id: int = TEST_SEARCH_SPACE_ID,
search_space_id: int,
timeout: float = 180.0,
interval: float = 3.0,
) -> dict[int, dict]: