Coverage for src/cstlcore/auth/dependencies.py: 69%

35 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1import uuid 

2 

3import jwt 

4from fastapi import Depends, HTTPException, status 

5from fastapi.security import OAuth2PasswordBearer 

6from loguru import logger 

7from sqlmodel import Session 

8 

9from cstlcore.database.dependencies import get_session 

10from cstlcore.security.jwt import decode_access_token 

11from cstlcore.users.models import User 

12 

13oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token") 

14 

15 

16def get_current_user( 

17 token: str = Depends(oauth2_scheme), session: Session = Depends(get_session) 

18) -> User: 

19 """ 

20 Get the current user from the access token. 

21 This function decodes the JWT token, retrieves the user ID from the token, 

22 and fetches the user from the database. 

23 Raises: 

24 HTTPException: If the token is invalid or the user does not exist. 

25 Returns: 

26 User: The user object corresponding to the token's subject. 

27 """ 

28 

29 credentials_exception = HTTPException( 

30 status_code=status.HTTP_401_UNAUTHORIZED, 

31 detail="Could not validate credentials", 

32 headers={"WWW-Authenticate": "Bearer"}, 

33 ) 

34 try: 

35 payload = decode_access_token(token) 

36 sub = payload.get("sub") 

37 if sub is None: 

38 raise credentials_exception 

39 user_id = uuid.UUID(sub) 

40 except jwt.InvalidTokenError: 

41 raise credentials_exception 

42 except Exception as e: 

43 logger.error(f"Token decoding error: {e}") 

44 raise HTTPException( 

45 status_code=status.HTTP_400_BAD_REQUEST, 

46 detail="Token decoding error", 

47 ) 

48 db_user = session.get(User, user_id) 

49 if not db_user: 

50 raise credentials_exception 

51 return db_user 

52 

53def is_user_admin(user: User) -> bool: 

54 """ 

55 Check if a user has admin privileges based on their email. 

56  

57 Args: 

58 user: The User object to check 

59  

60 Returns: 

61 bool: True if the user's email is in the admin list, False otherwise 

62 """ 

63 from cstlcore.settings import settings 

64 return settings.admin.is_admin_email(user.email) 

65 

66 

67def require_admin_user(current_user: User = Depends(get_current_user)) -> User: 

68 """ 

69 Ensure the current user has admin privileges by checking their email 

70 against the configured admin emails in the environment variable ADMIN__EMAILS. 

71 Returns the user if authorized; otherwise raises 403. 

72 """ 

73 from cstlcore.settings import settings 

74 

75 if settings.admin.is_admin_email(current_user.email): 

76 return current_user 

77 raise HTTPException( 

78 status_code=status.HTTP_403_FORBIDDEN, 

79 detail="Admin privileges required.", 

80 )