Coverage for src/cstlcore/auth/dependencies.py: 69%
35 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
1import uuid
3import jwt
4from fastapi import Depends, HTTPException, status
5from fastapi.security import OAuth2PasswordBearer
6from loguru import logger
7from sqlmodel import Session
9from cstlcore.database.dependencies import get_session
10from cstlcore.security.jwt import decode_access_token
11from cstlcore.users.models import User
13oauth2_scheme = OAuth2PasswordBearer(tokenUrl="auth/token")
16def get_current_user(
17 token: str = Depends(oauth2_scheme), session: Session = Depends(get_session)
18) -> User:
19 """
20 Get the current user from the access token.
21 This function decodes the JWT token, retrieves the user ID from the token,
22 and fetches the user from the database.
23 Raises:
24 HTTPException: If the token is invalid or the user does not exist.
25 Returns:
26 User: The user object corresponding to the token's subject.
27 """
29 credentials_exception = HTTPException(
30 status_code=status.HTTP_401_UNAUTHORIZED,
31 detail="Could not validate credentials",
32 headers={"WWW-Authenticate": "Bearer"},
33 )
34 try:
35 payload = decode_access_token(token)
36 sub = payload.get("sub")
37 if sub is None:
38 raise credentials_exception
39 user_id = uuid.UUID(sub)
40 except jwt.InvalidTokenError:
41 raise credentials_exception
42 except Exception as e:
43 logger.error(f"Token decoding error: {e}")
44 raise HTTPException(
45 status_code=status.HTTP_400_BAD_REQUEST,
46 detail="Token decoding error",
47 )
48 db_user = session.get(User, user_id)
49 if not db_user:
50 raise credentials_exception
51 return db_user
53def is_user_admin(user: User) -> bool:
54 """
55 Check if a user has admin privileges based on their email.
57 Args:
58 user: The User object to check
60 Returns:
61 bool: True if the user's email is in the admin list, False otherwise
62 """
63 from cstlcore.settings import settings
64 return settings.admin.is_admin_email(user.email)
67def require_admin_user(current_user: User = Depends(get_current_user)) -> User:
68 """
69 Ensure the current user has admin privileges by checking their email
70 against the configured admin emails in the environment variable ADMIN__EMAILS.
71 Returns the user if authorized; otherwise raises 403.
72 """
73 from cstlcore.settings import settings
75 if settings.admin.is_admin_email(current_user.email):
76 return current_user
77 raise HTTPException(
78 status_code=status.HTTP_403_FORBIDDEN,
79 detail="Admin privileges required.",
80 )