from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from datetime import timedelta, datetime from pydantic import BaseModel, EmailStr from app.database.database import get_db from app.models import User from app.schemas.user import UserCreate, UserResponse from app.services.auth import ( authenticate_user, create_access_token, get_password_hash, verify_token, ) from app.config import settings router = APIRouter(prefix="/api/auth", tags=["auth"]) class ForgotPasswordRequest(BaseModel): email: EmailStr class ResetPasswordRequest(BaseModel): token: str new_password: str @router.post("/register", response_model=UserResponse) def register(user: UserCreate, db: Session = Depends(get_db)): db_user = db.query(User).filter(User.email == user.email).first() if db_user: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Email already registered", ) hashed_password = get_password_hash(user.password) db_user = User( email=user.email, full_name=user.full_name, hashed_password=hashed_password, ) db.add(db_user) db.commit() db.refresh(db_user) return db_user @router.post("/login") def login(email: str, password: str, db: Session = Depends(get_db)): user = authenticate_user(db, email, password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid credentials", ) access_token_expires = timedelta(minutes=settings.access_token_expire_minutes) access_token = create_access_token( data={"sub": str(user.id)}, expires_delta=access_token_expires ) return { "access_token": access_token, "token_type": "bearer", "user": UserResponse.from_orm(user), } @router.post("/verify-token") def verify_token_endpoint(token: str): user_id = verify_token(token) if user_id is None: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token", ) return {"user_id": user_id, "valid": True} @router.post("/forgot-password") def forgot_password(request: ForgotPasswordRequest, db: Session = Depends(get_db)): """ Initiate password reset process. In a production app, this would send an email with a reset link. For now, we'll generate a reset token that can be used. """ user = db.query(User).filter(User.email == request.email).first() if not user: # Don't reveal if email exists or not for security return {"message": "If the email exists, a reset link has been sent"} # Create a password reset token (valid for 1 hour) reset_token = create_access_token( data={"sub": str(user.id), "type": "password_reset"}, expires_delta=timedelta(hours=1) ) # In production, send this token via email # For development, we'll just return it print(f"Password reset token for {request.email}: {reset_token}") return { "message": "If the email exists, a reset link has been sent", "reset_token": reset_token # Remove this in production } @router.post("/reset-password") def reset_password(request: ResetPasswordRequest, db: Session = Depends(get_db)): """ Reset password using the token from forgot-password endpoint. """ user_id = verify_token(request.token) if user_id is None: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid or expired reset token", ) user = db.query(User).filter(User.id == int(user_id)).first() if not user: raise HTTPException( status_code=status.HTTP_404_NOT_FOUND, detail="User not found", ) # Update password user.hashed_password = get_password_hash(request.new_password) db.commit() return {"message": "Password reset successful"}