test(auth): cover PAT fail-closed authorization

This commit is contained in:
Anish Sarkar 2026-06-20 02:13:05 +05:30
parent cf840875c9
commit b3fa96bef8
2 changed files with 172 additions and 0 deletions

View file

@ -0,0 +1,71 @@
"""Runtime smoke tests for fail-closed PAT authorization primitives."""
from __future__ import annotations
import pytest
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.context import AuthContext
from app.db import PersonalAccessToken, SearchSpace, User
from app.users import allow_any_principal, require_session_context
from app.utils.rbac import check_search_space_access
pytestmark = pytest.mark.integration
def _pat_auth(user: User) -> AuthContext:
pat = PersonalAccessToken(
user_id=user.id,
user=user,
token_hash="0" * 64,
token_prefix="ss_pat_test",
label="Test PAT",
)
return AuthContext.pat_auth(user, pat)
async def test_pat_is_rejected_by_session_only_dependency(db_user: User):
auth = _pat_auth(db_user)
with pytest.raises(HTTPException) as exc_info:
await require_session_context(auth=auth)
assert exc_info.value.status_code == 403
async def test_pat_is_allowed_by_bootstrap_dependency(db_user: User):
auth = _pat_auth(db_user)
assert await allow_any_principal(auth=auth) is auth
async def test_pat_is_rejected_for_api_disabled_space(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
db_search_space.api_access_enabled = False
await db_session.flush()
auth = _pat_auth(db_user)
with pytest.raises(HTTPException) as exc_info:
await check_search_space_access(db_session, auth, db_search_space.id)
assert exc_info.value.status_code == 403
assert exc_info.value.detail == "API access is not enabled for this search space."
async def test_pat_is_allowed_for_api_enabled_space(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
db_search_space.api_access_enabled = True
await db_session.flush()
auth = _pat_auth(db_user)
membership = await check_search_space_access(db_session, auth, db_search_space.id)
assert membership.user_id == db_user.id
assert membership.search_space_id == db_search_space.id

View file

@ -0,0 +1,101 @@
"""Static guards for the fail-closed PAT authorization model."""
from __future__ import annotations
from pathlib import Path
import pytest
pytestmark = pytest.mark.unit
BACKEND_ROOT = Path(__file__).resolve().parents[2]
APP_ROOT = BACKEND_ROOT / "app"
ALLOW_ANY_EXPECTED = {
"app.py": "auth: AuthContext = Depends(allow_any_principal)",
"routes/obsidian_plugin_routes.py": (
"_auth: AuthContext = Depends(allow_any_principal)"
),
"routes/search_spaces_routes.py": (
"auth: AuthContext = Depends(allow_any_principal)"
),
}
CONNECTOR_LISTERS = [
"routes/slack_add_connector_route.py",
"routes/composio_routes.py",
"routes/google_drive_add_connector_route.py",
"routes/discord_add_connector_route.py",
"routes/dropbox_add_connector_route.py",
"routes/onedrive_add_connector_route.py",
]
def _python_files() -> list[Path]:
return [
path
for path in APP_ROOT.rglob("*.py")
if "__pycache__" not in path.parts
]
def test_current_active_user_is_removed_from_app_tree() -> None:
offenders = [
str(path.relative_to(BACKEND_ROOT))
for path in _python_files()
if "current_active_user" in path.read_text()
]
assert offenders == []
def test_allow_any_principal_is_only_used_by_bootstrap_allowlist() -> None:
actual: dict[str, int] = {}
for path in _python_files():
text = path.read_text()
count = text.count("Depends(allow_any_principal)")
if count:
actual[str(path.relative_to(APP_ROOT))] = count
assert actual == dict.fromkeys(ALLOW_ANY_EXPECTED, 1)
for rel_path, expected_snippet in ALLOW_ANY_EXPECTED.items():
text = (APP_ROOT / rel_path).read_text()
assert expected_snippet in text
def test_connector_listers_route_pat_through_search_space_gate() -> None:
for rel_path in CONNECTOR_LISTERS:
text = (APP_ROOT / rel_path).read_text()
assert "auth: AuthContext = Depends(get_auth_context)" in text, rel_path
assert (
"await check_search_space_access(session, auth, connector.search_space_id)"
in text
), rel_path
def test_identity_routes_are_session_only() -> None:
session_only_files = [
"routes/prompts_routes.py",
"routes/memory_routes.py",
"routes/model_list_routes.py",
"routes/agent_flags_route.py",
"routes/youtube_routes.py",
"routes/incentive_tasks_routes.py",
"notifications/api/api.py",
"routes/chat_comments_routes.py",
"routes/public_chat_routes.py",
]
for rel_path in session_only_files:
text = (APP_ROOT / rel_path).read_text()
assert "require_session_context" in text, rel_path
assert "Depends(get_auth_context)" not in text, rel_path
def test_model_connection_personal_writes_default_to_session_required() -> None:
text = (APP_ROOT / "routes/model_connections_routes.py").read_text()
assert "allow_spaceless_pat: bool = False" in text
assert "auth.is_gated and not allow_spaceless_pat" in text
assert "Managing personal model connections requires an interactive session" in text