mirror of
https://github.com/MODSetter/SurfSense.git
synced 2026-06-26 21:39:43 +02:00
128 lines
3.4 KiB
Python
128 lines
3.4 KiB
Python
"""Centralized session-cookie I/O for web authentication."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from datetime import UTC, datetime, timedelta
|
|
from enum import Enum
|
|
from typing import Any
|
|
|
|
import jwt
|
|
from fastapi import Request, Response
|
|
|
|
from app.config import config
|
|
|
|
|
|
class TransportMode(Enum):
|
|
COOKIE = "cookie"
|
|
HEADER = "header"
|
|
|
|
|
|
def _cookie_secure(request: Request | None = None) -> bool:
|
|
policy = config.SESSION_COOKIE_SECURE_POLICY
|
|
if policy == "always":
|
|
return True
|
|
if policy == "never":
|
|
return False
|
|
if request is not None:
|
|
proto = request.headers.get("x-forwarded-proto")
|
|
if proto:
|
|
return proto.split(",", 1)[0].strip().lower() == "https"
|
|
return request.url.scheme == "https"
|
|
return bool(config.BACKEND_URL and config.BACKEND_URL.startswith("https://"))
|
|
|
|
|
|
def _set_persistent_cookie(
|
|
response: Response,
|
|
*,
|
|
key: str,
|
|
value: str,
|
|
max_age: int,
|
|
request: Request | None,
|
|
) -> None:
|
|
expires = datetime.now(UTC) + timedelta(seconds=max_age)
|
|
response.set_cookie(
|
|
key=key,
|
|
value=value,
|
|
max_age=max_age,
|
|
expires=expires,
|
|
httponly=True,
|
|
secure=_cookie_secure(request),
|
|
samesite=config.SESSION_COOKIE_SAMESITE,
|
|
domain=config.COOKIE_DOMAIN,
|
|
path="/",
|
|
)
|
|
|
|
|
|
def write_session(
|
|
response: Response,
|
|
access: str,
|
|
refresh: str | None = None,
|
|
request: Request | None = None,
|
|
) -> None:
|
|
_set_persistent_cookie(
|
|
response,
|
|
key=config.SESSION_COOKIE_NAME,
|
|
value=access,
|
|
max_age=config.ACCESS_TOKEN_LIFETIME_SECONDS,
|
|
request=request,
|
|
)
|
|
if refresh is not None:
|
|
_set_persistent_cookie(
|
|
response,
|
|
key=config.REFRESH_COOKIE_NAME,
|
|
value=refresh,
|
|
max_age=config.REFRESH_TOKEN_LIFETIME_SECONDS,
|
|
request=request,
|
|
)
|
|
|
|
|
|
def clear_session(response: Response, request: Request | None = None) -> None:
|
|
for key in (config.SESSION_COOKIE_NAME, config.REFRESH_COOKIE_NAME):
|
|
response.delete_cookie(
|
|
key=key,
|
|
path="/",
|
|
domain=config.COOKIE_DOMAIN,
|
|
secure=_cookie_secure(request),
|
|
samesite=config.SESSION_COOKIE_SAMESITE,
|
|
httponly=True,
|
|
)
|
|
|
|
|
|
def read_refresh(request: Request, body: Any | None = None) -> tuple[str | None, TransportMode]:
|
|
cookie = request.cookies.get(config.REFRESH_COOKIE_NAME)
|
|
if cookie:
|
|
return cookie, TransportMode.COOKIE
|
|
if body is None:
|
|
return None, TransportMode.HEADER
|
|
return getattr(body, "refresh_token", None), TransportMode.HEADER
|
|
|
|
|
|
def access_expires_at(access_token: str) -> int:
|
|
payload = jwt.decode(
|
|
access_token,
|
|
config.SECRET_KEY,
|
|
algorithms=["HS256"],
|
|
options={"verify_aud": False},
|
|
)
|
|
return int(payload["exp"])
|
|
|
|
|
|
def issue(
|
|
response: Response,
|
|
mode: TransportMode,
|
|
*,
|
|
access: str,
|
|
refresh: str | None,
|
|
access_expires_at: int,
|
|
request: Request | None = None,
|
|
) -> dict:
|
|
if mode is TransportMode.COOKIE:
|
|
write_session(response, access, refresh, request)
|
|
return {"authenticated": True, "access_expires_at": access_expires_at}
|
|
|
|
return {
|
|
"access_token": access,
|
|
"refresh_token": refresh,
|
|
"token_type": "bearer",
|
|
"access_expires_at": access_expires_at,
|
|
}
|