my-recipes/backend/auth_utils.py
2025-12-08 15:04:37 +02:00

97 lines
3.2 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
from jose import JWTError, jwt
import bcrypt
from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
import os
# Secret key for JWT (use environment variable in production)
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 days
security = HTTPBearer()
def hash_password(password: str) -> str:
"""Hash a password for storing."""
# Bcrypt has a 72 byte limit, truncate if necessary
password_bytes = password.encode('utf-8')[:72]
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""Verify a stored password against one provided by user"""
# Bcrypt has a 72 byte limit, truncate if necessary
password_bytes = plain_password.encode('utf-8')[:72]
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
"""Create JWT access token"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_token(token: str) -> dict:
"""Decode and verify JWT token"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return payload
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
headers={"WWW-Authenticate": "Bearer"},
)
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)) -> dict:
"""Get current user from JWT token (for protected routes)"""
from user_db_utils import get_user_by_id
token = credentials.credentials
payload = decode_token(token)
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid authentication credentials",
)
# Get full user info from database to include is_admin
user = get_user_by_id(int(user_id))
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
)
return {
"user_id": user["id"],
"username": user["username"],
"display_name": user["display_name"],
"is_admin": user.get("is_admin", False)
}
# Optional dependency - returns None if no token provided
def get_current_user_optional(credentials: Optional[HTTPAuthorizationCredentials] = Depends(security)) -> Optional[dict]:
"""Get current user if authenticated, otherwise None"""
if not credentials:
return None
try:
return get_current_user(credentials)
except HTTPException:
return None