dvirlabs 29aa5c2f36
All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
Add the option to login with username or phone or email and fix the leave messages
2026-05-08 18:54:26 +03:00

236 lines
7.3 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from datetime import timedelta, datetime
from pydantic import BaseModel, EmailStr
import random
import string
from app.database.database import get_db
from app.models import User
from app.schemas.user import UserCreate, UserResponse, RequestPinRequest, ResetPasswordWithPinRequest, ChangePasswordRequest
from app.services.auth import (
authenticate_user,
create_access_token,
get_password_hash,
verify_token,
verify_password,
get_current_user,
)
from app.config import settings
router = APIRouter(prefix="/api/auth", tags=["auth"])
class LoginRequest(BaseModel):
identifier: str # Can be email, username, or phone
password: str
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str
@router.post("/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login")
def login(request: LoginRequest, db: Session = Depends(get_db)):
user = authenticate_user(db, request.identifier, request.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": str(user.id)}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse.from_orm(user),
}
@router.post("/verify-token")
def verify_token_endpoint(token: str):
user_id = verify_token(token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
return {"user_id": user_id, "valid": True}
@router.post("/forgot-password")
def forgot_password(request: ForgotPasswordRequest, db: Session = Depends(get_db)):
"""
Initiate password reset process.
In a production app, this would send an email with a reset link.
For now, we'll generate a reset token that can be used.
"""
user = db.query(User).filter(User.email == request.email).first()
if not user:
# Don't reveal if email exists or not for security
return {"message": "If the email exists, a reset link has been sent"}
# Create a password reset token (valid for 1 hour)
reset_token = create_access_token(
data={"sub": str(user.id), "type": "password_reset"},
expires_delta=timedelta(hours=1)
)
# In production, send this token via email
# For development, we'll just return it
print(f"Password reset token for {request.email}: {reset_token}")
return {
"message": "If the email exists, a reset link has been sent",
"reset_token": reset_token # Remove this in production
}
@router.post("/reset-password")
def reset_password(request: ResetPasswordRequest, db: Session = Depends(get_db)):
"""
Reset password using the token from forgot-password endpoint.
"""
user_id = verify_token(request.token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token",
)
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Update password
user.hashed_password = get_password_hash(request.new_password)
db.commit()
return {"message": "Password reset successful"}
@router.post("/request-reset-pin")
def request_reset_pin(request: RequestPinRequest, db: Session = Depends(get_db)):
"""
Send a 6-digit PIN to user's email for password reset.
"""
user = db.query(User).filter(User.email == request.email).first()
if not user:
# Don't reveal if email exists
return {"message": "If the email exists, a PIN has been sent"}
# Generate 6-digit PIN
pin = ''.join(random.choices(string.digits, k=6))
# Store PIN with 15 minute expiration
user.password_reset_pin = pin
user.pin_expires_at = datetime.utcnow() + timedelta(minutes=15)
db.commit()
# TODO: Send PIN via email
# For now, print it (REMOVE IN PRODUCTION)
print(f"\n✅ Password Reset PIN for {request.email}: {pin}")
print(f"Expires at: {user.pin_expires_at}\n")
return {
"message": "If the email exists, a PIN has been sent",
"pin": pin # REMOVE IN PRODUCTION - only for testing
}
@router.post("/reset-password-with-pin")
def reset_password_with_pin(request: ResetPasswordWithPinRequest, db: Session = Depends(get_db)):
"""
Reset password using the PIN sent to email.
"""
user = db.query(User).filter(User.email == request.email).first()
if not user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email or PIN",
)
# Check if PIN exists and not expired
if not user.password_reset_pin or not user.pin_expires_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="No PIN request found. Please request a new PIN.",
)
if datetime.utcnow() > user.pin_expires_at:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="PIN has expired. Please request a new one.",
)
if user.password_reset_pin != request.pin:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid email or PIN",
)
# Update password and clear PIN
user.hashed_password = get_password_hash(request.new_password)
user.password_reset_pin = None
user.pin_expires_at = None
user.must_change_password = False # Clear forced password change flag
db.commit()
return {"message": "Password reset successful"}
@router.post("/change-password")
def change_password(
request: ChangePasswordRequest,
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
"""
Change password for logged-in user.
"""
# Verify old password
if not verify_password(request.old_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect",
)
# Update to new password
current_user.hashed_password = get_password_hash(request.new_password)
current_user.must_change_password = False # Clear forced password change flag
db.commit()
return {"message": "Password changed successfully"}