fix(tests):cover auth transport invariants

This commit is contained in:
Anish Sarkar 2026-06-24 03:55:39 +05:30
parent 62c7efb216
commit 766fa25ea0
3 changed files with 96 additions and 17 deletions

View file

@ -8,7 +8,6 @@ webhook fulfillment (idempotent), and the reconciliation fallback.
from __future__ import annotations
from types import SimpleNamespace
from urllib.parse import parse_qs, urlparse
import asyncpg
import httpx
@ -63,18 +62,13 @@ def _extract_access_token(response: httpx.Response) -> str | None:
if response.status_code == 200:
return response.json()["access_token"]
if response.status_code == 302:
location = response.headers.get("location", "")
return parse_qs(urlparse(location).query).get("token", [None])[0]
return None
async def _authenticate_test_user(client: httpx.AsyncClient) -> str:
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
"/auth/desktop/login",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
token = _extract_access_token(response)
if token:
@ -89,9 +83,8 @@ async def _authenticate_test_user(client: httpx.AsyncClient) -> str:
)
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
"/auth/desktop/login",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
token = _extract_access_token(response)
assert token, f"Login failed ({response.status_code}): {response.text}"

View file

@ -0,0 +1,88 @@
from __future__ import annotations
from types import SimpleNamespace
from fastapi import Request, Response
from app.auth.session_cookies import TransportMode, issue, read_refresh
from app.config import config
def _request_with_refresh_cookie(token: str) -> Request:
scope = {
"type": "http",
"method": "POST",
"path": "/auth/jwt/refresh",
"headers": [(b"cookie", f"{config.REFRESH_COOKIE_NAME}={token}".encode())],
"scheme": "https",
"server": ("testserver", 443),
}
return Request(scope)
def test_cookie_transport_sets_cookies_without_body_tokens():
response = Response()
body = issue(
response,
TransportMode.COOKIE,
access="access-token",
refresh="refresh-token",
access_expires_at=123,
)
assert "access_token" not in body
assert "refresh_token" not in body
assert body == {"authenticated": True, "access_expires_at": 123}
set_cookie_headers = response.headers.getlist("set-cookie")
assert any(config.SESSION_COOKIE_NAME in header for header in set_cookie_headers)
assert any(config.REFRESH_COOKIE_NAME in header for header in set_cookie_headers)
def test_cookie_transport_re_stamps_access_without_refresh_body_or_cookie():
response = Response()
body = issue(
response,
TransportMode.COOKIE,
access="access-token",
refresh=None,
access_expires_at=123,
)
assert "access_token" not in body
assert "refresh_token" not in body
set_cookie_headers = response.headers.getlist("set-cookie")
assert any(config.SESSION_COOKIE_NAME in header for header in set_cookie_headers)
assert not any(config.REFRESH_COOKIE_NAME in header for header in set_cookie_headers)
def test_header_transport_returns_body_tokens_without_cookies():
response = Response()
body = issue(
response,
TransportMode.HEADER,
access="access-token",
refresh="refresh-token",
access_expires_at=123,
)
assert body == {
"access_token": "access-token",
"refresh_token": "refresh-token",
"token_type": "bearer",
"access_expires_at": 123,
}
assert "set-cookie" not in response.headers
def test_read_refresh_cookie_source_wins_over_body_source():
request = _request_with_refresh_cookie("cookie-token")
refresh, mode = read_refresh(request, SimpleNamespace(refresh_token="body-token"))
assert refresh == "cookie-token"
assert mode is TransportMode.COOKIE

View file

@ -16,9 +16,8 @@ TEST_PASSWORD = "testpassword123"
async def get_auth_token(client: httpx.AsyncClient) -> str:
"""Log in and return a Bearer JWT token, registering the user first if needed."""
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
"/auth/desktop/login",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
if response.status_code == 200:
return response.json()["access_token"]
@ -32,9 +31,8 @@ async def get_auth_token(client: httpx.AsyncClient) -> str:
)
response = await client.post(
"/auth/jwt/login",
data={"username": TEST_EMAIL, "password": TEST_PASSWORD},
headers={"Content-Type": "application/x-www-form-urlencoded"},
"/auth/desktop/login",
json={"email": TEST_EMAIL, "password": TEST_PASSWORD},
)
assert response.status_code == 200, (
f"Login after registration failed ({response.status_code}): {response.text}"