46 lines
1.4 KiB
Python
46 lines
1.4 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.orm import Session
|
|
from app.core.security import decode_access_token
|
|
from app.db.base import get_db
|
|
from app.models.user import User
|
|
|
|
security = HTTPBearer()
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(security),
|
|
db: Session = Depends(get_db)
|
|
) -> User:
|
|
token = credentials.credentials
|
|
payload = decode_access_token(token)
|
|
|
|
if payload is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials"
|
|
)
|
|
|
|
user_id = payload.get("sub")
|
|
if user_id is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Invalid authentication credentials"
|
|
)
|
|
|
|
user = db.query(User).filter(User.id == int(user_id)).first()
|
|
if user is None:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="User not found"
|
|
)
|
|
|
|
return user
|
|
|
|
async def get_current_admin_user(current_user: User = Depends(get_current_user)) -> User:
|
|
if current_user.role != "admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="Not enough permissions"
|
|
)
|
|
return current_user
|