All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
266 lines
8.4 KiB
Python
266 lines
8.4 KiB
Python
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from sqlalchemy.orm import Session
|
|
from datetime import timedelta, datetime
|
|
from pydantic import BaseModel, EmailStr
|
|
import random
|
|
import string
|
|
from app.database.database import get_db
|
|
from app.models import User
|
|
from app.schemas.user import UserCreate, UserResponse, RequestPinRequest, ResetPasswordWithPinRequest, ChangePasswordRequest
|
|
from app.services.auth import (
|
|
authenticate_user,
|
|
create_access_token,
|
|
get_password_hash,
|
|
verify_token,
|
|
verify_password,
|
|
get_current_user,
|
|
)
|
|
from app.services.email import send_password_reset_pin, send_welcome_email
|
|
from app.config import settings
|
|
|
|
router = APIRouter(prefix="/api/auth", tags=["auth"])
|
|
|
|
|
|
class LoginRequest(BaseModel):
|
|
identifier: str # Can be email, username, or phone
|
|
password: str
|
|
|
|
|
|
class ForgotPasswordRequest(BaseModel):
|
|
email: EmailStr
|
|
|
|
|
|
class ResetPasswordRequest(BaseModel):
|
|
token: str
|
|
new_password: str
|
|
|
|
|
|
@router.post("/register", response_model=UserResponse)
|
|
def register(user: UserCreate, db: Session = Depends(get_db)):
|
|
db_user = db.query(User).filter(User.email == user.email).first()
|
|
if db_user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Email already registered",
|
|
)
|
|
|
|
# Check if username is already taken
|
|
if user.username:
|
|
existing_username = db.query(User).filter(User.username == user.username).first()
|
|
if existing_username:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Username already taken",
|
|
)
|
|
|
|
# Check if phone is already taken
|
|
if user.phone:
|
|
existing_phone = db.query(User).filter(User.phone == user.phone).first()
|
|
if existing_phone:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Phone number already registered",
|
|
)
|
|
|
|
hashed_password = get_password_hash(user.password)
|
|
db_user = User(
|
|
email=user.email,
|
|
username=user.username,
|
|
phone=user.phone,
|
|
full_name=user.full_name,
|
|
hashed_password=hashed_password,
|
|
)
|
|
db.add(db_user)
|
|
db.commit()
|
|
db.refresh(db_user)
|
|
|
|
# Send welcome email (non-blocking - don't fail registration if email fails)
|
|
try:
|
|
send_welcome_email(user.email, user.full_name)
|
|
except Exception as e:
|
|
print(f"Failed to send welcome email: {e}")
|
|
|
|
return db_user
|
|
|
|
|
|
@router.post("/login")
|
|
def login(request: LoginRequest, db: Session = Depends(get_db)):
|
|
user = authenticate_user(db, request.identifier, request.password)
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid credentials",
|
|
)
|
|
|
|
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
|
|
access_token = create_access_token(
|
|
data={"sub": str(user.id)}, expires_delta=access_token_expires
|
|
)
|
|
|
|
return {
|
|
"access_token": access_token,
|
|
"token_type": "bearer",
|
|
"user": UserResponse.from_orm(user),
|
|
}
|
|
|
|
|
|
@router.post("/verify-token")
|
|
def verify_token_endpoint(token: str):
|
|
user_id = verify_token(token)
|
|
if user_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid token",
|
|
)
|
|
return {"user_id": user_id, "valid": True}
|
|
|
|
|
|
@router.post("/forgot-password")
|
|
def forgot_password(request: ForgotPasswordRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Initiate password reset process.
|
|
In a production app, this would send an email with a reset link.
|
|
For now, we'll generate a reset token that can be used.
|
|
"""
|
|
user = db.query(User).filter(User.email == request.email).first()
|
|
if not user:
|
|
# Don't reveal if email exists or not for security
|
|
return {"message": "If the email exists, a reset link has been sent"}
|
|
|
|
# Create a password reset token (valid for 1 hour)
|
|
reset_token = create_access_token(
|
|
data={"sub": str(user.id), "type": "password_reset"},
|
|
expires_delta=timedelta(hours=1)
|
|
)
|
|
|
|
# In production, send this token via email
|
|
# For development, we'll just return it
|
|
print(f"Password reset token for {request.email}: {reset_token}")
|
|
|
|
return {
|
|
"message": "If the email exists, a reset link has been sent",
|
|
"reset_token": reset_token # Remove this in production
|
|
}
|
|
|
|
|
|
@router.post("/reset-password")
|
|
def reset_password(request: ResetPasswordRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Reset password using the token from forgot-password endpoint.
|
|
"""
|
|
user_id = verify_token(request.token)
|
|
if user_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid or expired reset token",
|
|
)
|
|
|
|
user = db.query(User).filter(User.id == int(user_id)).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="User not found",
|
|
)
|
|
|
|
# Update password
|
|
user.hashed_password = get_password_hash(request.new_password)
|
|
db.commit()
|
|
|
|
return {"message": "Password reset successful"}
|
|
|
|
|
|
@router.post("/request-reset-pin")
|
|
def request_reset_pin(request: RequestPinRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Send a 6-digit PIN to user's email for password reset.
|
|
"""
|
|
user = db.query(User).filter(User.email == request.email).first()
|
|
if not user:
|
|
# Don't reveal if email exists
|
|
return {"message": "If the email exists, a PIN has been sent"}
|
|
|
|
# Generate 6-digit PIN
|
|
pin = ''.join(random.choices(string.digits, k=6))
|
|
|
|
# Store PIN with 15 minute expiration
|
|
user.password_reset_pin = pin
|
|
user.pin_expires_at = datetime.utcnow() + timedelta(minutes=15)
|
|
db.commit()
|
|
|
|
# Send PIN via email
|
|
email_sent = send_password_reset_pin(request.email, pin, expires_minutes=15)
|
|
|
|
if not email_sent:
|
|
# If email sending fails, still print to console for development
|
|
print(f"\n⚠️ Email not sent. Password Reset PIN for {request.email}: {pin}")
|
|
print(f"Expires at: {user.pin_expires_at}\n")
|
|
|
|
return {
|
|
"message": "If the email exists, a PIN has been sent to your email",
|
|
}
|
|
|
|
|
|
@router.post("/reset-password-with-pin")
|
|
def reset_password_with_pin(request: ResetPasswordWithPinRequest, db: Session = Depends(get_db)):
|
|
"""
|
|
Reset password using the PIN sent to email.
|
|
"""
|
|
user = db.query(User).filter(User.email == request.email).first()
|
|
if not user:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid email or PIN",
|
|
)
|
|
|
|
# Check if PIN exists and not expired
|
|
if not user.password_reset_pin or not user.pin_expires_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="No PIN request found. Please request a new PIN.",
|
|
)
|
|
|
|
if datetime.utcnow() > user.pin_expires_at:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="PIN has expired. Please request a new one.",
|
|
)
|
|
|
|
if user.password_reset_pin != request.pin:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Invalid email or PIN",
|
|
)
|
|
|
|
# Update password and clear PIN
|
|
user.hashed_password = get_password_hash(request.new_password)
|
|
user.password_reset_pin = None
|
|
user.pin_expires_at = None
|
|
user.must_change_password = False # Clear forced password change flag
|
|
db.commit()
|
|
|
|
return {"message": "Password reset successful"}
|
|
|
|
|
|
@router.post("/change-password")
|
|
def change_password(
|
|
request: ChangePasswordRequest,
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""
|
|
Change password for logged-in user.
|
|
"""
|
|
# Verify old password
|
|
if not verify_password(request.old_password, current_user.hashed_password):
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="Current password is incorrect",
|
|
)
|
|
|
|
# Update to new password
|
|
current_user.hashed_password = get_password_hash(request.new_password)
|
|
current_user.must_change_password = False # Clear forced password change flag
|
|
db.commit()
|
|
|
|
return {"message": "Password changed successfully"}
|