refactor: update authentication error handling to prevent user enumeration and improve error messages

This commit is contained in:
Anish Sarkar 2026-02-09 12:57:32 +05:30
parent 2add106296
commit c1016591da
3 changed files with 14 additions and 44 deletions

View file

@ -2,7 +2,7 @@ import logging
import uuid
import httpx
from fastapi import Depends, HTTPException, Request, Response
from fastapi import Depends, Request, Response
from fastapi.responses import JSONResponse, RedirectResponse
from fastapi_users import BaseUserManager, FastAPIUsers, UUIDIDMixin, models
from fastapi_users.authentication import (
@ -47,6 +47,14 @@ if config.AUTH_TYPE == "GOOGLE":
class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
"""
Custom user manager extending fastapi-users BaseUserManager.
Authentication returns a generic error for both non-existent accounts
and incorrect passwords to comply with OWASP WSTG-IDNT-04 and
prevent user enumeration attacks.
"""
reset_password_token_secret = SECRET
verification_token_secret = SECRET
@ -180,36 +188,6 @@ class UserManager(UUIDIDMixin, BaseUserManager[User, uuid.UUID]):
):
print(f"Verification requested for user {user.id}. Verification token: {token}")
async def authenticate(self, credentials):
"""
Override to return a specific error when user account doesn't exist,
instead of a generic LOGIN_BAD_CREDENTIALS.
"""
from fastapi_users.exceptions import UserNotExists
try:
user = await self.get_by_email(credentials.username)
except UserNotExists:
# Still hash the password to mitigate timing attacks
self.password_helper.hash(credentials.password)
logger.warning(
f"Login attempt for non-existent account: {credentials.username}"
)
raise HTTPException(
status_code=400,
detail="LOGIN_USER_NOT_FOUND",
) from None
verified, updated_password_hash = self.password_helper.verify_and_update(
credentials.password, user.hashed_password
)
if not verified:
logger.warning(f"Failed login attempt (wrong password) for user: {user.id}")
return None
if updated_password_hash is not None:
await self.user_db.update(user, {"hashed_password": updated_password_hash})
return user
async def get_user_manager(user_db: SQLAlchemyUserDatabase = Depends(get_user_db)):
yield UserManager(user_db)