2025-12-11 15:51:54 +02:00

751 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import random
from typing import List, Optional
from datetime import timedelta
from fastapi import FastAPI, HTTPException, Query, Depends, Response
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel, EmailStr, field_validator
import os
import uvicorn
from db_utils import (
list_recipes_db,
create_recipe_db,
get_recipes_by_filters_db,
update_recipe_db,
delete_recipe_db,
get_conn,
)
from auth_utils import (
hash_password,
verify_password,
create_access_token,
get_current_user,
ACCESS_TOKEN_EXPIRE_MINUTES,
)
from user_db_utils import (
create_user,
get_user_by_username,
get_user_by_email,
)
from grocery_db_utils import (
create_grocery_list,
get_user_grocery_lists,
get_grocery_list_by_id,
update_grocery_list,
delete_grocery_list,
share_grocery_list,
unshare_grocery_list,
get_grocery_list_shares,
search_users,
toggle_grocery_list_pin,
)
from notification_db_utils import (
create_notification,
get_user_notifications,
mark_notification_as_read,
mark_all_notifications_as_read,
delete_notification,
)
class RecipeBase(BaseModel):
name: str
meal_type: str # breakfast / lunch / dinner / snack
time_minutes: int
made_by: Optional[str] = None # Person who created this recipe version
tags: List[str] = []
ingredients: List[str] = []
steps: List[str] = []
image: Optional[str] = None # Base64-encoded image or image URL
class RecipeCreate(RecipeBase):
pass
class Recipe(RecipeBase):
id: int
user_id: Optional[int] = None # Recipe owner ID
owner_display_name: Optional[str] = None # Owner's display name for filtering
class RecipeUpdate(RecipeBase):
pass
# User models
class UserRegister(BaseModel):
username: str
email: EmailStr
password: str
first_name: Optional[str] = None
last_name: Optional[str] = None
display_name: str
@field_validator('username')
@classmethod
def username_must_be_english(cls, v: str) -> str:
if not v.isascii():
raise ValueError('שם משתמש חייב להיות באנגלית בלבד')
if not all(c.isalnum() or c in '_-' for c in v):
raise ValueError('שם משתמש יכול להכיל רק אותיות, מספרים, _ ו-')
return v
class UserLogin(BaseModel):
username: str
password: str
class Token(BaseModel):
access_token: str
token_type: str
class UserResponse(BaseModel):
id: int
username: str
email: str
first_name: Optional[str] = None
last_name: Optional[str] = None
display_name: str
is_admin: bool = False
# Grocery List models
class GroceryListCreate(BaseModel):
name: str
items: List[str] = []
class GroceryListUpdate(BaseModel):
name: Optional[str] = None
items: Optional[List[str]] = None
class GroceryList(BaseModel):
id: int
name: str
items: List[str]
owner_id: int
is_pinned: bool = False
owner_display_name: Optional[str] = None
can_edit: bool = False
is_owner: bool = False
created_at: str
updated_at: str
class ShareGroceryList(BaseModel):
user_identifier: str # Can be username or display_name
can_edit: bool = False
class GroceryListShare(BaseModel):
id: int
list_id: int
shared_with_user_id: int
username: str
display_name: str
email: str
can_edit: bool
shared_at: str
class UserSearch(BaseModel):
id: int
username: str
display_name: str
email: str
class Notification(BaseModel):
id: int
user_id: int
type: str
message: str
related_id: Optional[int] = None
is_read: bool
created_at: str
app = FastAPI(
title="Random Recipes API",
description="API פשוט לבחירת מתכונים לצהריים / ערב / כל ארוחה 😋",
)
# Allow CORS from frontend domains
allowed_origins = [
"http://localhost:5173",
"http://localhost:3000",
"https://my-recipes.dvirlabs.com",
"http://my-recipes.dvirlabs.com",
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "PATCH", "DELETE", "OPTIONS"],
allow_headers=["*"],
max_age=0, # Disable CORS preflight caching
)
@app.get("/recipes", response_model=List[Recipe])
def list_recipes():
rows = list_recipes_db()
recipes = [
Recipe(
id=r["id"],
name=r["name"],
meal_type=r["meal_type"],
time_minutes=r["time_minutes"],
made_by=r.get("made_by"),
tags=r["tags"] or [],
ingredients=r["ingredients"] or [],
steps=r["steps"] or [],
image=r.get("image"),
user_id=r.get("user_id"),
owner_display_name=r.get("owner_display_name"),
)
for r in rows
]
return recipes
@app.post("/recipes", response_model=Recipe, status_code=201)
def create_recipe(recipe_in: RecipeCreate, current_user: dict = Depends(get_current_user)):
data = recipe_in.dict()
data["meal_type"] = data["meal_type"].lower()
data["user_id"] = current_user["user_id"]
row = create_recipe_db(data)
return Recipe(
id=row["id"],
name=row["name"],
meal_type=row["meal_type"],
time_minutes=row["time_minutes"],
made_by=row.get("made_by"),
tags=row["tags"] or [],
ingredients=row["ingredients"] or [],
steps=row["steps"] or [],
image=row.get("image"),
user_id=row.get("user_id"),
owner_display_name=current_user.get("display_name"),
)
@app.put("/recipes/{recipe_id}", response_model=Recipe)
def update_recipe(recipe_id: int, recipe_in: RecipeUpdate, current_user: dict = Depends(get_current_user)):
# Check ownership BEFORE updating (admins can edit any recipe)
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,))
recipe = cur.fetchone()
if not recipe:
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
# Allow if user is owner OR admin
if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False):
raise HTTPException(status_code=403, detail="אין לך הרשאה לערוך מתכון זה")
finally:
conn.close()
data = recipe_in.dict()
data["meal_type"] = data["meal_type"].lower()
row = update_recipe_db(recipe_id, data)
if not row:
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
return Recipe(
id=row["id"],
name=row["name"],
meal_type=row["meal_type"],
time_minutes=row["time_minutes"],
made_by=row.get("made_by"),
tags=row["tags"] or [],
ingredients=row["ingredients"] or [],
steps=row["steps"] or [],
image=row.get("image"),
user_id=row.get("user_id"),
owner_display_name=current_user.get("display_name"),
)
@app.delete("/recipes/{recipe_id}", status_code=204)
def delete_recipe(recipe_id: int, current_user: dict = Depends(get_current_user)):
# Get recipe first to check ownership (admins can delete any recipe)
conn = get_conn()
try:
with conn.cursor() as cur:
cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,))
recipe = cur.fetchone()
if not recipe:
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
# Allow if user is owner OR admin
if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False):
raise HTTPException(status_code=403, detail="אין לך הרשאה למחוק מתכון זה")
finally:
conn.close()
deleted = delete_recipe_db(recipe_id)
if not deleted:
raise HTTPException(status_code=404, detail="המתכון לא נמצא")
return
@app.get("/recipes/random", response_model=Recipe)
def random_recipe(
meal_type: Optional[str] = None,
max_time: Optional[int] = None,
ingredients: Optional[List[str]] = Query(
None,
description="רשימת מרכיבים (ingredients=ביצה&ingredients=עגבניה...)",
),
):
rows = get_recipes_by_filters_db(meal_type, max_time)
recipes = [
Recipe(
id=r["id"],
name=r["name"],
meal_type=r["meal_type"],
time_minutes=r["time_minutes"],
made_by=r.get("made_by"),
tags=r["tags"] or [],
ingredients=r["ingredients"] or [],
steps=r["steps"] or [],
image=r.get("image"),
user_id=r.get("user_id"),
owner_display_name=r.get("owner_display_name"),
)
for r in rows
]
# סינון לפי מרכיבים באפליקציה
if ingredients:
desired = {i.strip().lower() for i in ingredients if i.strip()}
if desired:
recipes = [
r
for r in recipes
if desired.issubset({ing.lower() for ing in r.ingredients})
]
if not recipes:
raise HTTPException(
status_code=404,
detail="לא נמצאו מתכונים מתאימים לפילטרים שבחרת",
)
return random.choice(recipes)
# Authentication endpoints
@app.post("/auth/register", response_model=UserResponse, status_code=201)
def register(user: UserRegister):
"""Register a new user"""
print(f"[REGISTER] Starting registration for username: {user.username}")
# Check if username already exists
print(f"[REGISTER] Checking if username exists...")
existing_user = get_user_by_username(user.username)
if existing_user:
print(f"[REGISTER] Username already exists")
raise HTTPException(
status_code=400,
detail="שם המשתמש כבר קיים במערכת"
)
# Check if email already exists
print(f"[REGISTER] Checking if email exists...")
existing_email = get_user_by_email(user.email)
if existing_email:
print(f"[REGISTER] Email already exists")
raise HTTPException(
status_code=400,
detail="האימייל כבר רשום במערכת"
)
# Check if display_name already exists
print(f"[REGISTER] Checking if display_name exists...")
from user_db_utils import get_user_by_display_name
existing_display_name = get_user_by_display_name(user.display_name)
if existing_display_name:
print(f"[REGISTER] Display name already exists")
raise HTTPException(
status_code=400,
detail="שם התצוגה כבר קיים במערכת"
)
# Hash password and create user
print(f"[REGISTER] Hashing password...")
password_hash = hash_password(user.password)
print(f"[REGISTER] Creating user in database...")
new_user = create_user(
user.username,
user.email,
password_hash,
user.first_name,
user.last_name,
user.display_name
)
print(f"[REGISTER] User created successfully: {new_user['id']}")
return UserResponse(
id=new_user["id"],
username=new_user["username"],
email=new_user["email"],
first_name=new_user.get("first_name"),
last_name=new_user.get("last_name"),
display_name=new_user["display_name"],
is_admin=new_user.get("is_admin", False)
)
@app.post("/auth/login", response_model=Token)
def login(user: UserLogin):
"""Login user and return JWT token"""
# Get user from database
db_user = get_user_by_username(user.username)
if not db_user:
raise HTTPException(
status_code=401,
detail="שם משתמש או סיסמה שגויים"
)
# Verify password
if not verify_password(user.password, db_user["password_hash"]):
raise HTTPException(
status_code=401,
detail="שם משתמש או סיסמה שגויים"
)
# Create access token
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": str(db_user["id"]), "username": db_user["username"]},
expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
@app.get("/auth/me", response_model=UserResponse)
def get_me(current_user: dict = Depends(get_current_user)):
"""Get current logged-in user info"""
from user_db_utils import get_user_by_id
user = get_user_by_id(current_user["user_id"])
if not user:
raise HTTPException(status_code=404, detail="משתמש לא נמצא")
return UserResponse(
id=user["id"],
username=user["username"],
email=user["email"],
first_name=user.get("first_name"),
last_name=user.get("last_name"),
display_name=user["display_name"],
is_admin=user.get("is_admin", False)
)
# ============= Grocery Lists Endpoints =============
@app.get("/grocery-lists", response_model=List[GroceryList])
def list_grocery_lists(current_user: dict = Depends(get_current_user)):
"""Get all grocery lists owned by or shared with the current user"""
lists = get_user_grocery_lists(current_user["user_id"])
# Convert datetime objects to strings
for lst in lists:
lst["created_at"] = str(lst["created_at"])
lst["updated_at"] = str(lst["updated_at"])
return lists
@app.post("/grocery-lists", response_model=GroceryList, status_code=201)
def create_new_grocery_list(
grocery_list: GroceryListCreate,
current_user: dict = Depends(get_current_user)
):
"""Create a new grocery list"""
new_list = create_grocery_list(
owner_id=current_user["user_id"],
name=grocery_list.name,
items=grocery_list.items
)
new_list["owner_display_name"] = current_user.get("display_name")
new_list["can_edit"] = True
new_list["is_owner"] = True
new_list["created_at"] = str(new_list["created_at"])
new_list["updated_at"] = str(new_list["updated_at"])
return new_list
@app.get("/grocery-lists/{list_id}", response_model=GroceryList)
def get_grocery_list(list_id: int, current_user: dict = Depends(get_current_user)):
"""Get a specific grocery list"""
grocery_list = get_grocery_list_by_id(list_id, current_user["user_id"])
if not grocery_list:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה או שאין לך גישה אליها")
grocery_list["created_at"] = str(grocery_list["created_at"])
grocery_list["updated_at"] = str(grocery_list["updated_at"])
return grocery_list
@app.put("/grocery-lists/{list_id}", response_model=GroceryList)
def update_grocery_list_endpoint(
list_id: int,
grocery_list_update: GroceryListUpdate,
current_user: dict = Depends(get_current_user)
):
"""Update a grocery list"""
# Check if user has access
existing = get_grocery_list_by_id(list_id, current_user["user_id"])
if not existing:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
# Check if user has edit permission
if not existing["can_edit"]:
raise HTTPException(status_code=403, detail="אין לך הרשאה לערוך רשימת קניות זו")
updated = update_grocery_list(
list_id=list_id,
name=grocery_list_update.name,
items=grocery_list_update.items
)
if not updated:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
# Get full details with permissions
result = get_grocery_list_by_id(list_id, current_user["user_id"])
result["created_at"] = str(result["created_at"])
result["updated_at"] = str(result["updated_at"])
return result
@app.delete("/grocery-lists/{list_id}", status_code=204)
def delete_grocery_list_endpoint(list_id: int, current_user: dict = Depends(get_current_user)):
"""Delete a grocery list (owner only)"""
# Check if user is owner
existing = get_grocery_list_by_id(list_id, current_user["user_id"])
if not existing:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
if not existing["is_owner"]:
raise HTTPException(status_code=403, detail="רק הבעלים יכול למחוק רשימת קניות")
deleted = delete_grocery_list(list_id)
if not deleted:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
return
@app.options("/grocery-lists/{list_id}/pin")
async def options_pin_grocery_list(list_id: int):
"""Handle CORS preflight for pin endpoint"""
return Response(status_code=200)
@app.patch("/grocery-lists/{list_id}/pin", response_model=GroceryList)
def toggle_pin_grocery_list_endpoint(list_id: int, current_user: dict = Depends(get_current_user)):
"""Toggle pin status for a grocery list (owner only)"""
updated = toggle_grocery_list_pin(list_id, current_user["user_id"])
if not updated:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה או שאין לך הרשאה")
# Get full details with permissions
result = get_grocery_list_by_id(list_id, current_user["user_id"])
result["created_at"] = str(result["created_at"])
result["updated_at"] = str(result["updated_at"])
return result
@app.post("/grocery-lists/{list_id}/share", response_model=GroceryListShare)
def share_grocery_list_endpoint(
list_id: int,
share_data: ShareGroceryList,
current_user: dict = Depends(get_current_user)
):
"""Share a grocery list with another user"""
# Check if user is owner
existing = get_grocery_list_by_id(list_id, current_user["user_id"])
if not existing:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
if not existing["is_owner"]:
raise HTTPException(status_code=403, detail="רק הבעלים יכול לשתף רשימת קניות")
# Find user by username or display_name
target_user = get_user_by_username(share_data.user_identifier)
if not target_user:
from user_db_utils import get_user_by_display_name
target_user = get_user_by_display_name(share_data.user_identifier)
if not target_user:
raise HTTPException(status_code=404, detail="משתמש לא נמצא")
# Don't allow sharing with yourself
if target_user["id"] == current_user["user_id"]:
raise HTTPException(status_code=400, detail="לא ניתן לשתף רשימה עם עצמך")
# Share the list
share = share_grocery_list(list_id, target_user["id"], share_data.can_edit)
# Create notification for the user who received the share
notification_message = f"רשימת קניות '{existing['name']}' שותפה איתך על ידי {current_user['display_name']}"
create_notification(
user_id=target_user["id"],
type="grocery_share",
message=notification_message,
related_id=list_id
)
# Return with user details
return GroceryListShare(
id=share["id"],
list_id=share["list_id"],
shared_with_user_id=share["shared_with_user_id"],
username=target_user["username"],
display_name=target_user["display_name"],
email=target_user["email"],
can_edit=share["can_edit"],
shared_at=str(share["shared_at"])
)
@app.get("/grocery-lists/{list_id}/shares", response_model=List[GroceryListShare])
def get_grocery_list_shares_endpoint(
list_id: int,
current_user: dict = Depends(get_current_user)
):
"""Get all users a grocery list is shared with"""
# Check if user is owner
existing = get_grocery_list_by_id(list_id, current_user["user_id"])
if not existing:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
if not existing["is_owner"]:
raise HTTPException(status_code=403, detail="רק הבעלים יכול לראות את רשימת השיתופים")
shares = get_grocery_list_shares(list_id)
return [
GroceryListShare(
id=share["id"],
list_id=share["list_id"],
shared_with_user_id=share["shared_with_user_id"],
username=share["username"],
display_name=share["display_name"],
email=share["email"],
can_edit=share["can_edit"],
shared_at=str(share["shared_at"])
)
for share in shares
]
@app.delete("/grocery-lists/{list_id}/shares/{user_id}", status_code=204)
def unshare_grocery_list_endpoint(
list_id: int,
user_id: int,
current_user: dict = Depends(get_current_user)
):
"""Remove sharing access for a user"""
# Check if user is owner
existing = get_grocery_list_by_id(list_id, current_user["user_id"])
if not existing:
raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה")
if not existing["is_owner"]:
raise HTTPException(status_code=403, detail="רק הבעלים יכול להסיר שיתופים")
deleted = unshare_grocery_list(list_id, user_id)
if not deleted:
raise HTTPException(status_code=404, detail="שיתוף לא נמצא")
return
@app.get("/users/search", response_model=List[UserSearch])
def search_users_endpoint(
q: str = Query(..., min_length=1, description="Search query for username or display name"),
current_user: dict = Depends(get_current_user)
):
"""Search users by username or display name for autocomplete"""
users = search_users(q)
return [
UserSearch(
id=user["id"],
username=user["username"],
display_name=user["display_name"],
email=user["email"]
)
for user in users
]
# ===========================
# Notification Endpoints
# ===========================
@app.get("/notifications", response_model=List[Notification])
def get_notifications_endpoint(
unread_only: bool = Query(False, description="Get only unread notifications"),
current_user: dict = Depends(get_current_user)
):
"""Get all notifications for the current user"""
notifications = get_user_notifications(current_user["user_id"], unread_only)
return [
Notification(
id=notif["id"],
user_id=notif["user_id"],
type=notif["type"],
message=notif["message"],
related_id=notif["related_id"],
is_read=notif["is_read"],
created_at=str(notif["created_at"])
)
for notif in notifications
]
@app.patch("/notifications/{notification_id}/read")
def mark_notification_read_endpoint(
notification_id: int,
current_user: dict = Depends(get_current_user)
):
"""Mark a notification as read"""
mark_notification_as_read(notification_id, current_user["user_id"])
return {"message": "Notification marked as read"}
@app.patch("/notifications/read-all")
def mark_all_notifications_read_endpoint(
current_user: dict = Depends(get_current_user)
):
"""Mark all notifications as read"""
mark_all_notifications_as_read(current_user["user_id"])
return {"message": "All notifications marked as read"}
@app.delete("/notifications/{notification_id}")
def delete_notification_endpoint(
notification_id: int,
current_user: dict = Depends(get_current_user)
):
"""Delete a notification"""
delete_notification(notification_id, current_user["user_id"])
return {"message": "Notification deleted"}
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)