Coverage for src/cstlcore/auth/router.py: 35%

113 statements  

« prev     ^ index     » next       coverage.py v7.9.1, created at 2026-02-19 12:46 +0000

1import uuid 

2from datetime import datetime, timedelta 

3 

4import httpx 

5from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException 

6from fastapi.responses import RedirectResponse 

7from fastapi.security import OAuth2PasswordRequestForm 

8from loguru import logger 

9from sqlmodel import Session, select 

10 

11from cstlcore.settings import settings 

12from cstlcore.constellations.models import Constellation 

13from cstlcore.database.dependencies import get_session 

14from cstlcore.memberships.models import AccessEnum, ConstellationMembership 

15from cstlcore.models.password_reset_tokens import PasswordResetToken 

16from cstlcore.models.token import Token 

17from cstlcore.security.jwt import create_access_token 

18from cstlcore.security.password import get_password_hash, verify_password 

19from cstlcore.auth.services import send_password_reset_email 

20from cstlcore.users.dependencies import get_user_by_email 

21from cstlcore.users.models import User, UserCreate 

22 

23router = APIRouter() 

24 

25 

26@router.get("/auth/google") 

27async def google_login(): 

28 """ 

29 Redirects to Google OAuth2 authorization endpoint. 

30 """ 

31 google_auth_url = ( 

32 f"{settings.google_auth.auth_uri}" 

33 "?response_type=code" 

34 f"&client_id={settings.google_auth.client_id}" 

35 f"&redirect_uri={settings.google_auth.redirect_uri}" 

36 "&scope=email%20profile" 

37 ) 

38 return RedirectResponse(url=google_auth_url) 

39 

40 

41@router.get("/auth/google/callback") 

42async def google_auth(code: str, session: Session = Depends(get_session)): 

43 """ 

44 Handles the callback **from** Google OAuth2, exchanges the code for an access token, 

45 and retrieves user information. 

46 """ 

47 data = { 

48 "code": code, 

49 "client_id": settings.google_auth.client_id, 

50 "client_secret": settings.google_auth.client_secret, 

51 "redirect_uri": settings.google_auth.redirect_uri, 

52 "grant_type": "authorization_code", 

53 } 

54 async with httpx.AsyncClient() as client: 

55 # Exchange the authorization code for an access token 

56 response = await client.post(str(settings.google_auth.token_uri), data=data) 

57 if response.status_code != 200: 

58 logger.error(f"Failed to obtain access token: {response.text}") 

59 raise HTTPException(status_code=400, detail="Failed to obtain access token") 

60 access_token = response.json().get("access_token") 

61 if not access_token: 

62 logger.error("Access token not found in response") 

63 raise HTTPException(status_code=400, detail="Access token not found") 

64 

65 # Retrieve user information using the access token 

66 user_info_response = await client.get( 

67 "https://www.googleapis.com/oauth2/v1/userinfo", 

68 headers={"Authorization": f"Bearer {access_token}"}, 

69 ) 

70 if user_info_response.status_code != 200: 

71 logger.error(f"Failed to retrieve user info: {user_info_response.text}") 

72 raise HTTPException(status_code=400, detail="Failed to retrieve user info") 

73 user_info = user_info_response.json() 

74 email = user_info.get("email") 

75 

76 db_user = session.exec(select(User).where(User.email == email)).first() 

77 if not db_user: 

78 # If user does not exist, create a new user 

79 db_user = User( 

80 id=uuid.uuid4(), 

81 email=email, 

82 name=user_info.get("name", "Unknown"), 

83 password_hash=get_password_hash( 

84 uuid.uuid4().hex 

85 ), # Password is not used for OAuth users 

86 email_verified=True, 

87 ) 

88 session.add(db_user) 

89 session.commit() 

90 session.refresh(db_user) 

91 logger.info(f"New user created: {db_user.email}") 

92 

93 # TODO: Create a constellation and membership for the user 

94 constellation = Constellation( 

95 id=uuid.uuid4(), 

96 name="My Constellation", 

97 description="Your own universe!", 

98 ) 

99 membership = ConstellationMembership( 

100 access=AccessEnum.OWNER, 

101 constellation_id=constellation.id, 

102 user_id=db_user.id, 

103 ) 

104 session.add(constellation) 

105 session.add(membership) 

106 session.commit() 

107 logger.info(f"New constellation created for user: {db_user.email}") 

108 else: 

109 logger.info(f"Existing user logged in: {db_user.email}") 

110 

111 # Issue an access token for the user 

112 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours) 

113 access_token = create_access_token( 

114 data={"sub": db_user.id.hex}, 

115 expires_delta=access_token_expiry, 

116 ) 

117 return RedirectResponse( 

118 url=f"{settings.landing_page_url}/?access_token={access_token}&token_type=bearer" 

119 ) 

120 

121 

122@router.post("/auth/token", response_model=Token) 

123async def login_for_access_token( 

124 form_data: OAuth2PasswordRequestForm = Depends(), 

125 session: Session = Depends(get_session), 

126): 

127 db_user = session.exec(select(User).where(User.email == form_data.username)).first() 

128 if not db_user: 

129 raise HTTPException(status_code=401, detail="Incorrect username or password") 

130 if not verify_password(form_data.password, db_user.password_hash): 

131 raise HTTPException(status_code=401, detail="Incorrect username or password") 

132 

133 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours) 

134 access_token = create_access_token( 

135 data={"sub": db_user.id.hex}, 

136 expires_delta=access_token_expiry, 

137 ) 

138 return {"access_token": access_token, "token_type": "bearer"} 

139 

140 

141@router.post("/auth/register", response_model=Token, status_code=201) 

142async def register_user( 

143 user: UserCreate, 

144 session: Session = Depends(get_session), 

145): 

146 existing_user = session.exec(select(User).where(User.email == user.email)).first() 

147 if existing_user: 

148 raise HTTPException( 

149 status_code=400, detail="Email already registered. Please log in." 

150 ) 

151 

152 password_hash = get_password_hash(user.password) 

153 db_user = User.model_validate(user, update={"password_hash": password_hash}) 

154 

155 constellation = Constellation( 

156 id=uuid.uuid4(), name="My Constellation", description="Your own universe!" 

157 ) 

158 

159 membership = ConstellationMembership( 

160 access=AccessEnum.OWNER, constellation_id=constellation.id, user_id=db_user.id 

161 ) 

162 

163 session.add(db_user) 

164 session.add(constellation) 

165 session.add(membership) 

166 session.commit() 

167 

168 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours) 

169 access_token = create_access_token( 

170 data={"sub": db_user.id.hex}, 

171 expires_delta=access_token_expiry, 

172 ) 

173 return {"access_token": access_token, "token_type": "bearer"} 

174 

175 

176@router.post("/auth/forgot-password") 

177async def forgot_password( 

178 background_tasks: BackgroundTasks, 

179 user: User = Depends(get_user_by_email), 

180 session: Session = Depends(get_session), 

181): 

182 # Generate token and save to database first 

183 reset_token = uuid.uuid4().hex 

184 expires_at = datetime.now() + timedelta(hours=24) 

185 

186 password_reset_token = PasswordResetToken( 

187 token=reset_token, user_id=user.id, expires_at=expires_at 

188 ) 

189 session.add(password_reset_token) 

190 session.commit() 

191 

192 # Add email sending to background tasks 

193 reset_link = ( 

194 f"{settings.services.frontend}/login?mode=reset-password&token={reset_token}" 

195 ) 

196 background_tasks.add_task( 

197 send_password_reset_email, user.email, user.name, reset_link 

198 ) 

199 

200 return {"ok": True} 

201 

202 

203@router.post("/auth/reset-password") 

204async def reset_password( 

205 token: str, new_password: str, session: Session = Depends(get_session) 

206): 

207 """ 

208 Endpoint to reset the user's password. 

209 """ 

210 password_reset_token = session.exec( 

211 select(PasswordResetToken).where(PasswordResetToken.token == token) 

212 ).first() 

213 

214 if not password_reset_token: 

215 raise HTTPException(status_code=404, detail="Invalid or expired token") 

216 

217 if password_reset_token.expires_at < datetime.now(): 

218 session.delete(password_reset_token) 

219 session.commit() 

220 raise HTTPException(status_code=404, detail="Invalid or expired token") 

221 

222 db_user = session.exec( 

223 select(User).where(User.id == password_reset_token.user_id) 

224 ).first() 

225 

226 if not db_user: 

227 raise HTTPException(status_code=404, detail="User not found") 

228 

229 db_user.password_hash = get_password_hash(new_password) 

230 session.add(db_user) 

231 session.delete(password_reset_token) 

232 session.commit() 

233 

234 logger.info(f"Password reset successful for user {db_user.email}") 

235 return {"ok": True}