381 lines
12 KiB
Python
381 lines
12 KiB
Python
import random
|
||
from typing import List, Optional
|
||
from datetime import timedelta
|
||
|
||
from fastapi import FastAPI, HTTPException, Query, Depends
|
||
from fastapi.middleware.cors import CORSMiddleware
|
||
from pydantic import BaseModel, EmailStr, field_validator
|
||
import os
|
||
|
||
import uvicorn
|
||
|
||
from db_utils import (
|
||
list_recipes_db,
|
||
create_recipe_db,
|
||
get_recipes_by_filters_db,
|
||
update_recipe_db,
|
||
delete_recipe_db,
|
||
get_conn,
|
||
)
|
||
|
||
from auth_utils import (
|
||
hash_password,
|
||
verify_password,
|
||
create_access_token,
|
||
get_current_user,
|
||
ACCESS_TOKEN_EXPIRE_MINUTES,
|
||
)
|
||
|
||
from user_db_utils import (
|
||
create_user,
|
||
get_user_by_username,
|
||
get_user_by_email,
|
||
)
|
||
|
||
|
||
class RecipeBase(BaseModel):
|
||
name: str
|
||
meal_type: str # breakfast / lunch / dinner / snack
|
||
time_minutes: int
|
||
made_by: Optional[str] = None # Person who created this recipe version
|
||
tags: List[str] = []
|
||
ingredients: List[str] = []
|
||
steps: List[str] = []
|
||
image: Optional[str] = None # Base64-encoded image or image URL
|
||
|
||
|
||
class RecipeCreate(RecipeBase):
|
||
pass
|
||
|
||
|
||
class Recipe(RecipeBase):
|
||
id: int
|
||
user_id: Optional[int] = None # Recipe owner ID
|
||
owner_display_name: Optional[str] = None # Owner's display name for filtering
|
||
|
||
|
||
class RecipeUpdate(RecipeBase):
|
||
pass
|
||
|
||
|
||
# User models
|
||
class UserRegister(BaseModel):
|
||
username: str
|
||
email: EmailStr
|
||
password: str
|
||
first_name: Optional[str] = None
|
||
last_name: Optional[str] = None
|
||
display_name: str
|
||
|
||
@field_validator('username')
|
||
@classmethod
|
||
def username_must_be_english(cls, v: str) -> str:
|
||
if not v.isascii():
|
||
raise ValueError('שם משתמש חייב להיות באנגלית בלבד')
|
||
if not all(c.isalnum() or c in '_-' for c in v):
|
||
raise ValueError('שם משתמש יכול להכיל רק אותיות, מספרים, _ ו-')
|
||
return v
|
||
|
||
|
||
class UserLogin(BaseModel):
|
||
username: str
|
||
password: str
|
||
|
||
|
||
class Token(BaseModel):
|
||
access_token: str
|
||
token_type: str
|
||
|
||
|
||
class UserResponse(BaseModel):
|
||
id: int
|
||
username: str
|
||
email: str
|
||
first_name: Optional[str] = None
|
||
last_name: Optional[str] = None
|
||
display_name: str
|
||
is_admin: bool = False
|
||
|
||
|
||
app = FastAPI(
|
||
title="Random Recipes API",
|
||
description="API פשוט לבחירת מתכונים לצהריים / ערב / כל ארוחה 😋",
|
||
)
|
||
|
||
# Allow CORS from frontend domains
|
||
allowed_origins = [
|
||
"http://localhost:5173",
|
||
"http://localhost:3000",
|
||
"https://my-recipes.dvirlabs.com",
|
||
"http://my-recipes.dvirlabs.com",
|
||
]
|
||
|
||
app.add_middleware(
|
||
CORSMiddleware,
|
||
allow_origins=allowed_origins,
|
||
allow_credentials=True,
|
||
allow_methods=["*"],
|
||
allow_headers=["*"],
|
||
)
|
||
|
||
|
||
@app.get("/recipes", response_model=List[Recipe])
|
||
def list_recipes():
|
||
rows = list_recipes_db()
|
||
recipes = [
|
||
Recipe(
|
||
id=r["id"],
|
||
name=r["name"],
|
||
meal_type=r["meal_type"],
|
||
time_minutes=r["time_minutes"],
|
||
made_by=r.get("made_by"),
|
||
tags=r["tags"] or [],
|
||
ingredients=r["ingredients"] or [],
|
||
steps=r["steps"] or [],
|
||
image=r.get("image"),
|
||
user_id=r.get("user_id"),
|
||
owner_display_name=r.get("owner_display_name"),
|
||
)
|
||
for r in rows
|
||
]
|
||
return recipes
|
||
|
||
|
||
@app.post("/recipes", response_model=Recipe, status_code=201)
|
||
def create_recipe(recipe_in: RecipeCreate, current_user: dict = Depends(get_current_user)):
|
||
data = recipe_in.dict()
|
||
data["meal_type"] = data["meal_type"].lower()
|
||
data["user_id"] = current_user["user_id"]
|
||
|
||
row = create_recipe_db(data)
|
||
|
||
return Recipe(
|
||
id=row["id"],
|
||
name=row["name"],
|
||
meal_type=row["meal_type"],
|
||
time_minutes=row["time_minutes"],
|
||
made_by=row.get("made_by"),
|
||
tags=row["tags"] or [],
|
||
ingredients=row["ingredients"] or [],
|
||
steps=row["steps"] or [],
|
||
image=row.get("image"),
|
||
user_id=row.get("user_id"),
|
||
owner_display_name=current_user.get("display_name"),
|
||
)
|
||
|
||
@app.put("/recipes/{recipe_id}", response_model=Recipe)
|
||
def update_recipe(recipe_id: int, recipe_in: RecipeUpdate, current_user: dict = Depends(get_current_user)):
|
||
# Check ownership BEFORE updating (admins can edit any recipe)
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,))
|
||
recipe = cur.fetchone()
|
||
if not recipe:
|
||
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
|
||
# Allow if user is owner OR admin
|
||
if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False):
|
||
raise HTTPException(status_code=403, detail="אין לך הרשאה לערוך מתכון זה")
|
||
finally:
|
||
conn.close()
|
||
|
||
data = recipe_in.dict()
|
||
data["meal_type"] = data["meal_type"].lower()
|
||
|
||
row = update_recipe_db(recipe_id, data)
|
||
if not row:
|
||
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
|
||
|
||
return Recipe(
|
||
id=row["id"],
|
||
name=row["name"],
|
||
meal_type=row["meal_type"],
|
||
time_minutes=row["time_minutes"],
|
||
made_by=row.get("made_by"),
|
||
tags=row["tags"] or [],
|
||
ingredients=row["ingredients"] or [],
|
||
steps=row["steps"] or [],
|
||
image=row.get("image"),
|
||
user_id=row.get("user_id"),
|
||
owner_display_name=current_user.get("display_name"),
|
||
)
|
||
|
||
|
||
@app.delete("/recipes/{recipe_id}", status_code=204)
|
||
def delete_recipe(recipe_id: int, current_user: dict = Depends(get_current_user)):
|
||
# Get recipe first to check ownership (admins can delete any recipe)
|
||
conn = get_conn()
|
||
try:
|
||
with conn.cursor() as cur:
|
||
cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,))
|
||
recipe = cur.fetchone()
|
||
if not recipe:
|
||
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
|
||
# Allow if user is owner OR admin
|
||
if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False):
|
||
raise HTTPException(status_code=403, detail="אין לך הרשאה למחוק מתכון זה")
|
||
finally:
|
||
conn.close()
|
||
|
||
deleted = delete_recipe_db(recipe_id)
|
||
if not deleted:
|
||
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
|
||
return
|
||
|
||
|
||
@app.get("/recipes/random", response_model=Recipe)
|
||
def random_recipe(
|
||
meal_type: Optional[str] = None,
|
||
max_time: Optional[int] = None,
|
||
ingredients: Optional[List[str]] = Query(
|
||
None,
|
||
description="רשימת מרכיבים (ingredients=ביצה&ingredients=עגבניה...)",
|
||
),
|
||
):
|
||
rows = get_recipes_by_filters_db(meal_type, max_time)
|
||
|
||
recipes = [
|
||
Recipe(
|
||
id=r["id"],
|
||
name=r["name"],
|
||
meal_type=r["meal_type"],
|
||
time_minutes=r["time_minutes"],
|
||
made_by=r.get("made_by"),
|
||
tags=r["tags"] or [],
|
||
ingredients=r["ingredients"] or [],
|
||
steps=r["steps"] or [],
|
||
image=r.get("image"),
|
||
user_id=r.get("user_id"),
|
||
owner_display_name=r.get("owner_display_name"),
|
||
)
|
||
for r in rows
|
||
]
|
||
|
||
# סינון לפי מרכיבים באפליקציה
|
||
if ingredients:
|
||
desired = {i.strip().lower() for i in ingredients if i.strip()}
|
||
if desired:
|
||
recipes = [
|
||
r
|
||
for r in recipes
|
||
if desired.issubset({ing.lower() for ing in r.ingredients})
|
||
]
|
||
|
||
if not recipes:
|
||
raise HTTPException(
|
||
status_code=404,
|
||
detail="לא נמצאו מתכונים מתאימים לפילטרים שבחרת",
|
||
)
|
||
|
||
return random.choice(recipes)
|
||
|
||
|
||
# Authentication endpoints
|
||
@app.post("/auth/register", response_model=UserResponse, status_code=201)
|
||
def register(user: UserRegister):
|
||
"""Register a new user"""
|
||
print(f"[REGISTER] Starting registration for username: {user.username}")
|
||
|
||
# Check if username already exists
|
||
print(f"[REGISTER] Checking if username exists...")
|
||
existing_user = get_user_by_username(user.username)
|
||
if existing_user:
|
||
print(f"[REGISTER] Username already exists")
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="שם המשתמש כבר קיים במערכת"
|
||
)
|
||
|
||
# Check if email already exists
|
||
print(f"[REGISTER] Checking if email exists...")
|
||
existing_email = get_user_by_email(user.email)
|
||
if existing_email:
|
||
print(f"[REGISTER] Email already exists")
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="האימייל כבר רשום במערכת"
|
||
)
|
||
|
||
# Check if display_name already exists
|
||
print(f"[REGISTER] Checking if display_name exists...")
|
||
from user_db_utils import get_user_by_display_name
|
||
existing_display_name = get_user_by_display_name(user.display_name)
|
||
if existing_display_name:
|
||
print(f"[REGISTER] Display name already exists")
|
||
raise HTTPException(
|
||
status_code=400,
|
||
detail="שם התצוגה כבר קיים במערכת"
|
||
)
|
||
|
||
# Hash password and create user
|
||
print(f"[REGISTER] Hashing password...")
|
||
password_hash = hash_password(user.password)
|
||
print(f"[REGISTER] Creating user in database...")
|
||
new_user = create_user(
|
||
user.username,
|
||
user.email,
|
||
password_hash,
|
||
user.first_name,
|
||
user.last_name,
|
||
user.display_name
|
||
)
|
||
print(f"[REGISTER] User created successfully: {new_user['id']}")
|
||
|
||
return UserResponse(
|
||
id=new_user["id"],
|
||
username=new_user["username"],
|
||
email=new_user["email"],
|
||
first_name=new_user.get("first_name"),
|
||
last_name=new_user.get("last_name"),
|
||
display_name=new_user["display_name"]
|
||
)
|
||
|
||
|
||
@app.post("/auth/login", response_model=Token)
|
||
def login(user: UserLogin):
|
||
"""Login user and return JWT token"""
|
||
# Get user from database
|
||
db_user = get_user_by_username(user.username)
|
||
if not db_user:
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="שם משתמש או סיסמה שגויים"
|
||
)
|
||
|
||
# Verify password
|
||
if not verify_password(user.password, db_user["password_hash"]):
|
||
raise HTTPException(
|
||
status_code=401,
|
||
detail="שם משתמש או סיסמה שגויים"
|
||
)
|
||
|
||
# Create access token
|
||
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
|
||
access_token = create_access_token(
|
||
data={"sub": str(db_user["id"]), "username": db_user["username"]},
|
||
expires_delta=access_token_expires
|
||
)
|
||
|
||
return Token(access_token=access_token, token_type="bearer")
|
||
|
||
|
||
@app.get("/auth/me", response_model=UserResponse)
|
||
def get_me(current_user: dict = Depends(get_current_user)):
|
||
"""Get current logged-in user info"""
|
||
from user_db_utils import get_user_by_id
|
||
user = get_user_by_id(current_user["user_id"])
|
||
if not user:
|
||
raise HTTPException(status_code=404, detail="משתמש לא נמצא")
|
||
|
||
return UserResponse(
|
||
id=user["id"],
|
||
username=user["username"],
|
||
email=user["email"],
|
||
first_name=user.get("first_name"),
|
||
last_name=user.get("last_name"),
|
||
display_name=user["display_name"]
|
||
)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True) |