Coverage for src/cstlcore/auth/router.py: 35%
113 statements
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
« prev ^ index » next coverage.py v7.9.1, created at 2026-02-19 12:46 +0000
1import uuid
2from datetime import datetime, timedelta
4import httpx
5from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
6from fastapi.responses import RedirectResponse
7from fastapi.security import OAuth2PasswordRequestForm
8from loguru import logger
9from sqlmodel import Session, select
11from cstlcore.settings import settings
12from cstlcore.constellations.models import Constellation
13from cstlcore.database.dependencies import get_session
14from cstlcore.memberships.models import AccessEnum, ConstellationMembership
15from cstlcore.models.password_reset_tokens import PasswordResetToken
16from cstlcore.models.token import Token
17from cstlcore.security.jwt import create_access_token
18from cstlcore.security.password import get_password_hash, verify_password
19from cstlcore.auth.services import send_password_reset_email
20from cstlcore.users.dependencies import get_user_by_email
21from cstlcore.users.models import User, UserCreate
23router = APIRouter()
26@router.get("/auth/google")
27async def google_login():
28 """
29 Redirects to Google OAuth2 authorization endpoint.
30 """
31 google_auth_url = (
32 f"{settings.google_auth.auth_uri}"
33 "?response_type=code"
34 f"&client_id={settings.google_auth.client_id}"
35 f"&redirect_uri={settings.google_auth.redirect_uri}"
36 "&scope=email%20profile"
37 )
38 return RedirectResponse(url=google_auth_url)
41@router.get("/auth/google/callback")
42async def google_auth(code: str, session: Session = Depends(get_session)):
43 """
44 Handles the callback **from** Google OAuth2, exchanges the code for an access token,
45 and retrieves user information.
46 """
47 data = {
48 "code": code,
49 "client_id": settings.google_auth.client_id,
50 "client_secret": settings.google_auth.client_secret,
51 "redirect_uri": settings.google_auth.redirect_uri,
52 "grant_type": "authorization_code",
53 }
54 async with httpx.AsyncClient() as client:
55 # Exchange the authorization code for an access token
56 response = await client.post(str(settings.google_auth.token_uri), data=data)
57 if response.status_code != 200:
58 logger.error(f"Failed to obtain access token: {response.text}")
59 raise HTTPException(status_code=400, detail="Failed to obtain access token")
60 access_token = response.json().get("access_token")
61 if not access_token:
62 logger.error("Access token not found in response")
63 raise HTTPException(status_code=400, detail="Access token not found")
65 # Retrieve user information using the access token
66 user_info_response = await client.get(
67 "https://www.googleapis.com/oauth2/v1/userinfo",
68 headers={"Authorization": f"Bearer {access_token}"},
69 )
70 if user_info_response.status_code != 200:
71 logger.error(f"Failed to retrieve user info: {user_info_response.text}")
72 raise HTTPException(status_code=400, detail="Failed to retrieve user info")
73 user_info = user_info_response.json()
74 email = user_info.get("email")
76 db_user = session.exec(select(User).where(User.email == email)).first()
77 if not db_user:
78 # If user does not exist, create a new user
79 db_user = User(
80 id=uuid.uuid4(),
81 email=email,
82 name=user_info.get("name", "Unknown"),
83 password_hash=get_password_hash(
84 uuid.uuid4().hex
85 ), # Password is not used for OAuth users
86 email_verified=True,
87 )
88 session.add(db_user)
89 session.commit()
90 session.refresh(db_user)
91 logger.info(f"New user created: {db_user.email}")
93 # TODO: Create a constellation and membership for the user
94 constellation = Constellation(
95 id=uuid.uuid4(),
96 name="My Constellation",
97 description="Your own universe!",
98 )
99 membership = ConstellationMembership(
100 access=AccessEnum.OWNER,
101 constellation_id=constellation.id,
102 user_id=db_user.id,
103 )
104 session.add(constellation)
105 session.add(membership)
106 session.commit()
107 logger.info(f"New constellation created for user: {db_user.email}")
108 else:
109 logger.info(f"Existing user logged in: {db_user.email}")
111 # Issue an access token for the user
112 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours)
113 access_token = create_access_token(
114 data={"sub": db_user.id.hex},
115 expires_delta=access_token_expiry,
116 )
117 return RedirectResponse(
118 url=f"{settings.landing_page_url}/?access_token={access_token}&token_type=bearer"
119 )
122@router.post("/auth/token", response_model=Token)
123async def login_for_access_token(
124 form_data: OAuth2PasswordRequestForm = Depends(),
125 session: Session = Depends(get_session),
126):
127 db_user = session.exec(select(User).where(User.email == form_data.username)).first()
128 if not db_user:
129 raise HTTPException(status_code=401, detail="Incorrect username or password")
130 if not verify_password(form_data.password, db_user.password_hash):
131 raise HTTPException(status_code=401, detail="Incorrect username or password")
133 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours)
134 access_token = create_access_token(
135 data={"sub": db_user.id.hex},
136 expires_delta=access_token_expiry,
137 )
138 return {"access_token": access_token, "token_type": "bearer"}
141@router.post("/auth/register", response_model=Token, status_code=201)
142async def register_user(
143 user: UserCreate,
144 session: Session = Depends(get_session),
145):
146 existing_user = session.exec(select(User).where(User.email == user.email)).first()
147 if existing_user:
148 raise HTTPException(
149 status_code=400, detail="Email already registered. Please log in."
150 )
152 password_hash = get_password_hash(user.password)
153 db_user = User.model_validate(user, update={"password_hash": password_hash})
155 constellation = Constellation(
156 id=uuid.uuid4(), name="My Constellation", description="Your own universe!"
157 )
159 membership = ConstellationMembership(
160 access=AccessEnum.OWNER, constellation_id=constellation.id, user_id=db_user.id
161 )
163 session.add(db_user)
164 session.add(constellation)
165 session.add(membership)
166 session.commit()
168 access_token_expiry = timedelta(hours=settings.jwt.access_token_expire_hours)
169 access_token = create_access_token(
170 data={"sub": db_user.id.hex},
171 expires_delta=access_token_expiry,
172 )
173 return {"access_token": access_token, "token_type": "bearer"}
176@router.post("/auth/forgot-password")
177async def forgot_password(
178 background_tasks: BackgroundTasks,
179 user: User = Depends(get_user_by_email),
180 session: Session = Depends(get_session),
181):
182 # Generate token and save to database first
183 reset_token = uuid.uuid4().hex
184 expires_at = datetime.now() + timedelta(hours=24)
186 password_reset_token = PasswordResetToken(
187 token=reset_token, user_id=user.id, expires_at=expires_at
188 )
189 session.add(password_reset_token)
190 session.commit()
192 # Add email sending to background tasks
193 reset_link = (
194 f"{settings.services.frontend}/login?mode=reset-password&token={reset_token}"
195 )
196 background_tasks.add_task(
197 send_password_reset_email, user.email, user.name, reset_link
198 )
200 return {"ok": True}
203@router.post("/auth/reset-password")
204async def reset_password(
205 token: str, new_password: str, session: Session = Depends(get_session)
206):
207 """
208 Endpoint to reset the user's password.
209 """
210 password_reset_token = session.exec(
211 select(PasswordResetToken).where(PasswordResetToken.token == token)
212 ).first()
214 if not password_reset_token:
215 raise HTTPException(status_code=404, detail="Invalid or expired token")
217 if password_reset_token.expires_at < datetime.now():
218 session.delete(password_reset_token)
219 session.commit()
220 raise HTTPException(status_code=404, detail="Invalid or expired token")
222 db_user = session.exec(
223 select(User).where(User.id == password_reset_token.user_id)
224 ).first()
226 if not db_user:
227 raise HTTPException(status_code=404, detail="User not found")
229 db_user.password_hash = get_password_hash(new_password)
230 session.add(db_user)
231 session.delete(password_reset_token)
232 session.commit()
234 logger.info(f"Password reset successful for user {db_user.email}")
235 return {"ok": True}