fix(authz):add zero context authorization checks

This commit is contained in:
Anish Sarkar 2026-06-23 12:53:44 +05:30
parent 08c1d12eb1
commit 7241a7a894
3 changed files with 136 additions and 0 deletions

View file

@ -0,0 +1,29 @@
"""Zero sync authentication context routes."""
from pydantic import BaseModel
from fastapi import APIRouter, Depends
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.context import AuthContext
from app.db import get_async_session
from app.users import get_auth_context
from app.utils.rbac import get_allowed_read_space_ids
router = APIRouter(prefix="/zero", tags=["zero"])
class ZeroContextResponse(BaseModel):
userId: str
allowedSpaceIds: list[int]
@router.get("/context", response_model=ZeroContextResponse)
async def get_zero_context(
auth: AuthContext = Depends(get_auth_context),
session: AsyncSession = Depends(get_async_session),
) -> ZeroContextResponse:
allowed_space_ids = await get_allowed_read_space_ids(session, auth)
return ZeroContextResponse(
userId=str(auth.user.id),
allowedSpaceIds=allowed_space_ids,
)

View file

@ -0,0 +1,85 @@
"""Regression tests for Zero's backend-computed authorization context."""
from __future__ import annotations
import pytest
from fastapi import HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from app.auth.context import AuthContext
from app.db import PersonalAccessToken, SearchSpace, User
from app.routes.search_spaces_routes import create_default_roles_and_membership
from app.utils.rbac import check_search_space_access, get_allowed_read_space_ids
pytestmark = pytest.mark.integration
def _pat_auth(user: User) -> AuthContext:
pat = PersonalAccessToken(
user_id=user.id,
user=user,
token_hash="1" * 64,
token_prefix="ss_pat_zero",
label="Zero PAT",
)
return AuthContext.pat_auth(user, pat)
async def _space_with_membership(
db_session: AsyncSession,
user: User,
*,
api_access_enabled: bool,
) -> SearchSpace:
space = SearchSpace(
name="Zero Authz Space",
user_id=user.id,
api_access_enabled=api_access_enabled,
)
db_session.add(space)
await db_session.flush()
await create_default_roles_and_membership(db_session, space.id, user.id)
await db_session.flush()
return space
async def test_zero_read_set_matches_session_search_space_access(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
disabled_space = await _space_with_membership(
db_session,
db_user,
api_access_enabled=False,
)
session_auth = AuthContext.session(db_user)
allowed_ids = set(await get_allowed_read_space_ids(db_session, session_auth))
for space in (db_search_space, disabled_space):
membership = await check_search_space_access(db_session, session_auth, space.id)
assert membership.search_space_id in allowed_ids
async def test_zero_read_set_applies_pat_api_access_gate(
db_session: AsyncSession,
db_user: User,
db_search_space: SearchSpace,
):
db_search_space.api_access_enabled = True
disabled_space = await _space_with_membership(
db_session,
db_user,
api_access_enabled=False,
)
await db_session.flush()
pat_auth = _pat_auth(db_user)
allowed_ids = set(await get_allowed_read_space_ids(db_session, pat_auth))
assert db_search_space.id in allowed_ids
assert disabled_space.id not in allowed_ids
with pytest.raises(HTTPException) as exc_info:
await check_search_space_access(db_session, pat_auth, disabled_space.id)
assert exc_info.value.status_code == 403

View file

@ -0,0 +1,22 @@
"""Static guards for Zero authorization wiring."""
from __future__ import annotations
from pathlib import Path
import pytest
pytestmark = pytest.mark.unit
REPO_ROOT = Path(__file__).resolve().parents[3]
WEB_ROOT = REPO_ROOT / "surfsense_web"
def test_zero_query_route_uses_authoritative_backend_context() -> None:
route = WEB_ROOT / "app/api/zero/query/route.ts"
text = route.read_text()
assert "/zero/context" in text
assert "/users/me" not in text
assert "userID: auth.ctx.userId" in text
assert "handleQueryRequest({" in text