66 lines
2.7 KiB
Python
66 lines
2.7 KiB
Python
from app.db import get_db_connection
|
|
from app.auth.utils import hash_password, verify_password, create_access_token
|
|
from app.schemas import UserRegister, UserLogin, TokenResponse
|
|
import json
|
|
|
|
class AuthService:
|
|
"""Handle user authentication"""
|
|
|
|
@staticmethod
|
|
def register(user_data: UserRegister) -> TokenResponse:
|
|
"""Register a new user"""
|
|
with get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
|
|
# Check if email already exists
|
|
cur.execute("SELECT id FROM users WHERE email = %s", (user_data.email,))
|
|
if cur.fetchone():
|
|
raise ValueError("Email already registered")
|
|
|
|
# Check if username already exists
|
|
cur.execute("SELECT id FROM users WHERE username = %s", (user_data.username,))
|
|
if cur.fetchone():
|
|
raise ValueError("Username already taken")
|
|
|
|
# Hash password and create user
|
|
hashed_pwd = hash_password(user_data.password)
|
|
cur.execute(
|
|
"INSERT INTO users (username, email, hashed_password) VALUES (%s, %s, %s) RETURNING id",
|
|
(user_data.username, user_data.email, hashed_pwd)
|
|
)
|
|
user_id = cur.fetchone()[0]
|
|
conn.commit()
|
|
|
|
# Create profile
|
|
cur.execute(
|
|
"""INSERT INTO profiles
|
|
(user_id, display_name, age, gender, location, bio, interests)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s)""",
|
|
(user_id, user_data.display_name, 0, "not_specified", "", "", json.dumps([]))
|
|
)
|
|
conn.commit()
|
|
|
|
# Generate token
|
|
token = create_access_token(user_id, user_data.email)
|
|
return TokenResponse(access_token=token, token_type="bearer", user_id=user_id)
|
|
|
|
@staticmethod
|
|
def login(user_data: UserLogin) -> TokenResponse:
|
|
"""Authenticate user and return token - accepts email or username"""
|
|
with get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
# Try to find user by email first, then by username
|
|
cur.execute(
|
|
"SELECT id, hashed_password, email FROM users WHERE email = %s OR username = %s",
|
|
(user_data.email_or_username, user_data.email_or_username)
|
|
)
|
|
row = cur.fetchone()
|
|
|
|
if not row or not verify_password(user_data.password, row[1]):
|
|
raise ValueError("Invalid email, username or password")
|
|
|
|
user_id = row[0]
|
|
email = row[2]
|
|
token = create_access_token(user_id, email)
|
|
return TokenResponse(access_token=token, token_type="bearer", user_id=user_id)
|