from app.db import get_db_connection from app.auth.utils import hash_password, verify_password, create_access_token from app.schemas import UserRegister, UserLogin, TokenResponse import json class AuthService: """Handle user authentication""" @staticmethod def register(user_data: UserRegister) -> TokenResponse: """Register a new user""" with get_db_connection() as conn: cur = conn.cursor() # Check if email already exists cur.execute("SELECT id FROM users WHERE email = %s", (user_data.email,)) if cur.fetchone(): raise ValueError("Email already registered") # Hash password and create user hashed_pwd = hash_password(user_data.password) cur.execute( "INSERT INTO users (email, hashed_password) VALUES (%s, %s) RETURNING id", (user_data.email, hashed_pwd) ) user_id = cur.fetchone()[0] conn.commit() # Create profile cur.execute( """INSERT INTO profiles (user_id, display_name, age, gender, location, bio, interests) VALUES (%s, %s, %s, %s, %s, %s, %s)""", (user_id, user_data.display_name, 0, "not_specified", "", "", json.dumps([])) ) conn.commit() # Generate token token = create_access_token(user_id, user_data.email) return TokenResponse(access_token=token, token_type="bearer", user_id=user_id) @staticmethod def login(user_data: UserLogin) -> TokenResponse: """Authenticate user and return token""" with get_db_connection() as conn: cur = conn.cursor() cur.execute("SELECT id, hashed_password FROM users WHERE email = %s", (user_data.email,)) row = cur.fetchone() if not row or not verify_password(user_data.password, row[1]): raise ValueError("Invalid email or password") user_id = row[0] token = create_access_token(user_id, user_data.email) return TokenResponse(access_token=token, token_type="bearer", user_id=user_id)