2026-05-04 01:01:36 +03:00

133 lines
3.9 KiB
Python

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from datetime import timedelta, datetime
from pydantic import BaseModel, EmailStr
from app.database.database import get_db
from app.models import User
from app.schemas.user import UserCreate, UserResponse
from app.services.auth import (
authenticate_user,
create_access_token,
get_password_hash,
verify_token,
)
from app.config import settings
router = APIRouter(prefix="/api/auth", tags=["auth"])
class ForgotPasswordRequest(BaseModel):
email: EmailStr
class ResetPasswordRequest(BaseModel):
token: str
new_password: str
@router.post("/register", response_model=UserResponse)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = db.query(User).filter(User.email == user.email).first()
if db_user:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email already registered",
)
hashed_password = get_password_hash(user.password)
db_user = User(
email=user.email,
full_name=user.full_name,
hashed_password=hashed_password,
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
@router.post("/login")
def login(email: str, password: str, db: Session = Depends(get_db)):
user = authenticate_user(db, email, password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid credentials",
)
access_token_expires = timedelta(minutes=settings.access_token_expire_minutes)
access_token = create_access_token(
data={"sub": str(user.id)}, expires_delta=access_token_expires
)
return {
"access_token": access_token,
"token_type": "bearer",
"user": UserResponse.from_orm(user),
}
@router.post("/verify-token")
def verify_token_endpoint(token: str):
user_id = verify_token(token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token",
)
return {"user_id": user_id, "valid": True}
@router.post("/forgot-password")
def forgot_password(request: ForgotPasswordRequest, db: Session = Depends(get_db)):
"""
Initiate password reset process.
In a production app, this would send an email with a reset link.
For now, we'll generate a reset token that can be used.
"""
user = db.query(User).filter(User.email == request.email).first()
if not user:
# Don't reveal if email exists or not for security
return {"message": "If the email exists, a reset link has been sent"}
# Create a password reset token (valid for 1 hour)
reset_token = create_access_token(
data={"sub": str(user.id), "type": "password_reset"},
expires_delta=timedelta(hours=1)
)
# In production, send this token via email
# For development, we'll just return it
print(f"Password reset token for {request.email}: {reset_token}")
return {
"message": "If the email exists, a reset link has been sent",
"reset_token": reset_token # Remove this in production
}
@router.post("/reset-password")
def reset_password(request: ResetPasswordRequest, db: Session = Depends(get_db)):
"""
Reset password using the token from forgot-password endpoint.
"""
user_id = verify_token(request.token)
if user_id is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired reset token",
)
user = db.query(User).filter(User.id == int(user_id)).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="User not found",
)
# Update password
user.hashed_password = get_password_hash(request.new_password)
db.commit()
return {"message": "Password reset successful"}