2025-12-17 00:15:27 +02:00

56 lines
2.2 KiB
Python

from app.db import get_db_connection
from app.auth.utils import hash_password, verify_password, create_access_token
from app.schemas import UserRegister, UserLogin, TokenResponse
import json
class AuthService:
"""Handle user authentication"""
@staticmethod
def register(user_data: UserRegister) -> TokenResponse:
"""Register a new user"""
with get_db_connection() as conn:
cur = conn.cursor()
# Check if email already exists
cur.execute("SELECT id FROM users WHERE email = %s", (user_data.email,))
if cur.fetchone():
raise ValueError("Email already registered")
# Hash password and create user
hashed_pwd = hash_password(user_data.password)
cur.execute(
"INSERT INTO users (email, hashed_password) VALUES (%s, %s) RETURNING id",
(user_data.email, hashed_pwd)
)
user_id = cur.fetchone()[0]
conn.commit()
# Create profile
cur.execute(
"""INSERT INTO profiles
(user_id, display_name, age, gender, location, bio, interests)
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
(user_id, user_data.display_name, 0, "not_specified", "", "", json.dumps([]))
)
conn.commit()
# Generate token
token = create_access_token(user_id, user_data.email)
return TokenResponse(access_token=token, token_type="bearer", user_id=user_id)
@staticmethod
def login(user_data: UserLogin) -> TokenResponse:
"""Authenticate user and return token"""
with get_db_connection() as conn:
cur = conn.cursor()
cur.execute("SELECT id, hashed_password FROM users WHERE email = %s", (user_data.email,))
row = cur.fetchone()
if not row or not verify_password(user_data.password, row[1]):
raise ValueError("Invalid email or password")
user_id = row[0]
token = create_access_token(user_id, user_data.email)
return TokenResponse(access_token=token, token_type="bearer", user_id=user_id)