From b3fa96bef83074151d3abe130fe8385312dc18a7 Mon Sep 17 00:00:00 2001 From: Anish Sarkar <104695310+AnishSarkar22@users.noreply.github.com> Date: Sat, 20 Jun 2026 02:13:05 +0530 Subject: [PATCH] test(auth): cover PAT fail-closed authorization --- .../integration/test_pat_fail_closed_authz.py | 71 ++++++++++++ .../tests/unit/test_pat_fail_closed_static.py | 101 ++++++++++++++++++ 2 files changed, 172 insertions(+) create mode 100644 surfsense_backend/tests/integration/test_pat_fail_closed_authz.py create mode 100644 surfsense_backend/tests/unit/test_pat_fail_closed_static.py diff --git a/surfsense_backend/tests/integration/test_pat_fail_closed_authz.py b/surfsense_backend/tests/integration/test_pat_fail_closed_authz.py new file mode 100644 index 000000000..5bec3f48a --- /dev/null +++ b/surfsense_backend/tests/integration/test_pat_fail_closed_authz.py @@ -0,0 +1,71 @@ +"""Runtime smoke tests for fail-closed PAT authorization primitives.""" + +from __future__ import annotations + +import pytest +from fastapi import HTTPException +from sqlalchemy.ext.asyncio import AsyncSession + +from app.auth.context import AuthContext +from app.db import PersonalAccessToken, SearchSpace, User +from app.users import allow_any_principal, require_session_context +from app.utils.rbac import check_search_space_access + +pytestmark = pytest.mark.integration + + +def _pat_auth(user: User) -> AuthContext: + pat = PersonalAccessToken( + user_id=user.id, + user=user, + token_hash="0" * 64, + token_prefix="ss_pat_test", + label="Test PAT", + ) + return AuthContext.pat_auth(user, pat) + + +async def test_pat_is_rejected_by_session_only_dependency(db_user: User): + auth = _pat_auth(db_user) + + with pytest.raises(HTTPException) as exc_info: + await require_session_context(auth=auth) + + assert exc_info.value.status_code == 403 + + +async def test_pat_is_allowed_by_bootstrap_dependency(db_user: User): + auth = _pat_auth(db_user) + + assert await allow_any_principal(auth=auth) is auth + + +async def test_pat_is_rejected_for_api_disabled_space( + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, +): + db_search_space.api_access_enabled = False + await db_session.flush() + auth = _pat_auth(db_user) + + with pytest.raises(HTTPException) as exc_info: + await check_search_space_access(db_session, auth, db_search_space.id) + + assert exc_info.value.status_code == 403 + assert exc_info.value.detail == "API access is not enabled for this search space." + + +async def test_pat_is_allowed_for_api_enabled_space( + db_session: AsyncSession, + db_user: User, + db_search_space: SearchSpace, +): + db_search_space.api_access_enabled = True + await db_session.flush() + auth = _pat_auth(db_user) + + membership = await check_search_space_access(db_session, auth, db_search_space.id) + + assert membership.user_id == db_user.id + assert membership.search_space_id == db_search_space.id diff --git a/surfsense_backend/tests/unit/test_pat_fail_closed_static.py b/surfsense_backend/tests/unit/test_pat_fail_closed_static.py new file mode 100644 index 000000000..01ecd918f --- /dev/null +++ b/surfsense_backend/tests/unit/test_pat_fail_closed_static.py @@ -0,0 +1,101 @@ +"""Static guards for the fail-closed PAT authorization model.""" + +from __future__ import annotations + +from pathlib import Path + +import pytest + +pytestmark = pytest.mark.unit + +BACKEND_ROOT = Path(__file__).resolve().parents[2] +APP_ROOT = BACKEND_ROOT / "app" + +ALLOW_ANY_EXPECTED = { + "app.py": "auth: AuthContext = Depends(allow_any_principal)", + "routes/obsidian_plugin_routes.py": ( + "_auth: AuthContext = Depends(allow_any_principal)" + ), + "routes/search_spaces_routes.py": ( + "auth: AuthContext = Depends(allow_any_principal)" + ), +} + +CONNECTOR_LISTERS = [ + "routes/slack_add_connector_route.py", + "routes/composio_routes.py", + "routes/google_drive_add_connector_route.py", + "routes/discord_add_connector_route.py", + "routes/dropbox_add_connector_route.py", + "routes/onedrive_add_connector_route.py", +] + + +def _python_files() -> list[Path]: + return [ + path + for path in APP_ROOT.rglob("*.py") + if "__pycache__" not in path.parts + ] + + +def test_current_active_user_is_removed_from_app_tree() -> None: + offenders = [ + str(path.relative_to(BACKEND_ROOT)) + for path in _python_files() + if "current_active_user" in path.read_text() + ] + + assert offenders == [] + + +def test_allow_any_principal_is_only_used_by_bootstrap_allowlist() -> None: + actual: dict[str, int] = {} + for path in _python_files(): + text = path.read_text() + count = text.count("Depends(allow_any_principal)") + if count: + actual[str(path.relative_to(APP_ROOT))] = count + + assert actual == dict.fromkeys(ALLOW_ANY_EXPECTED, 1) + + for rel_path, expected_snippet in ALLOW_ANY_EXPECTED.items(): + text = (APP_ROOT / rel_path).read_text() + assert expected_snippet in text + + +def test_connector_listers_route_pat_through_search_space_gate() -> None: + for rel_path in CONNECTOR_LISTERS: + text = (APP_ROOT / rel_path).read_text() + assert "auth: AuthContext = Depends(get_auth_context)" in text, rel_path + assert ( + "await check_search_space_access(session, auth, connector.search_space_id)" + in text + ), rel_path + + +def test_identity_routes_are_session_only() -> None: + session_only_files = [ + "routes/prompts_routes.py", + "routes/memory_routes.py", + "routes/model_list_routes.py", + "routes/agent_flags_route.py", + "routes/youtube_routes.py", + "routes/incentive_tasks_routes.py", + "notifications/api/api.py", + "routes/chat_comments_routes.py", + "routes/public_chat_routes.py", + ] + + for rel_path in session_only_files: + text = (APP_ROOT / rel_path).read_text() + assert "require_session_context" in text, rel_path + assert "Depends(get_auth_context)" not in text, rel_path + + +def test_model_connection_personal_writes_default_to_session_required() -> None: + text = (APP_ROOT / "routes/model_connections_routes.py").read_text() + + assert "allow_spaceless_pat: bool = False" in text + assert "auth.is_gated and not allow_spaceless_pat" in text + assert "Managing personal model connections requires an interactive session" in text