import random from typing import List, Optional from datetime import timedelta from fastapi import FastAPI, HTTPException, Query, Depends from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel, EmailStr, field_validator import os import uvicorn from db_utils import ( list_recipes_db, create_recipe_db, get_recipes_by_filters_db, update_recipe_db, delete_recipe_db, get_conn, ) from auth_utils import ( hash_password, verify_password, create_access_token, get_current_user, ACCESS_TOKEN_EXPIRE_MINUTES, ) from user_db_utils import ( create_user, get_user_by_username, get_user_by_email, ) from grocery_db_utils import ( create_grocery_list, get_user_grocery_lists, get_grocery_list_by_id, update_grocery_list, delete_grocery_list, share_grocery_list, unshare_grocery_list, get_grocery_list_shares, search_users, ) from notification_db_utils import ( create_notification, get_user_notifications, mark_notification_as_read, mark_all_notifications_as_read, delete_notification, ) class RecipeBase(BaseModel): name: str meal_type: str # breakfast / lunch / dinner / snack time_minutes: int made_by: Optional[str] = None # Person who created this recipe version tags: List[str] = [] ingredients: List[str] = [] steps: List[str] = [] image: Optional[str] = None # Base64-encoded image or image URL class RecipeCreate(RecipeBase): pass class Recipe(RecipeBase): id: int user_id: Optional[int] = None # Recipe owner ID owner_display_name: Optional[str] = None # Owner's display name for filtering class RecipeUpdate(RecipeBase): pass # User models class UserRegister(BaseModel): username: str email: EmailStr password: str first_name: Optional[str] = None last_name: Optional[str] = None display_name: str @field_validator('username') @classmethod def username_must_be_english(cls, v: str) -> str: if not v.isascii(): raise ValueError('שם משתמש חייב להיות באנגלית בלבד') if not all(c.isalnum() or c in '_-' for c in v): raise ValueError('שם משתמש יכול להכיל רק אותיות, מספרים, _ ו-') return v class UserLogin(BaseModel): username: str password: str class Token(BaseModel): access_token: str token_type: str class UserResponse(BaseModel): id: int username: str email: str first_name: Optional[str] = None last_name: Optional[str] = None display_name: str is_admin: bool = False # Grocery List models class GroceryListCreate(BaseModel): name: str items: List[str] = [] class GroceryListUpdate(BaseModel): name: Optional[str] = None items: Optional[List[str]] = None class GroceryList(BaseModel): id: int name: str items: List[str] owner_id: int owner_display_name: Optional[str] = None can_edit: bool = False is_owner: bool = False created_at: str updated_at: str class ShareGroceryList(BaseModel): user_identifier: str # Can be username or display_name can_edit: bool = False class GroceryListShare(BaseModel): id: int list_id: int shared_with_user_id: int username: str display_name: str email: str can_edit: bool shared_at: str class UserSearch(BaseModel): id: int username: str display_name: str email: str class Notification(BaseModel): id: int user_id: int type: str message: str related_id: Optional[int] = None is_read: bool created_at: str app = FastAPI( title="Random Recipes API", description="API פשוט לבחירת מתכונים לצהריים / ערב / כל ארוחה 😋", ) # Allow CORS from frontend domains allowed_origins = [ "http://localhost:5173", "http://localhost:3000", "https://my-recipes.dvirlabs.com", "http://my-recipes.dvirlabs.com", ] app.add_middleware( CORSMiddleware, allow_origins=allowed_origins, allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) @app.get("/recipes", response_model=List[Recipe]) def list_recipes(): rows = list_recipes_db() recipes = [ Recipe( id=r["id"], name=r["name"], meal_type=r["meal_type"], time_minutes=r["time_minutes"], made_by=r.get("made_by"), tags=r["tags"] or [], ingredients=r["ingredients"] or [], steps=r["steps"] or [], image=r.get("image"), user_id=r.get("user_id"), owner_display_name=r.get("owner_display_name"), ) for r in rows ] return recipes @app.post("/recipes", response_model=Recipe, status_code=201) def create_recipe(recipe_in: RecipeCreate, current_user: dict = Depends(get_current_user)): data = recipe_in.dict() data["meal_type"] = data["meal_type"].lower() data["user_id"] = current_user["user_id"] row = create_recipe_db(data) return Recipe( id=row["id"], name=row["name"], meal_type=row["meal_type"], time_minutes=row["time_minutes"], made_by=row.get("made_by"), tags=row["tags"] or [], ingredients=row["ingredients"] or [], steps=row["steps"] or [], image=row.get("image"), user_id=row.get("user_id"), owner_display_name=current_user.get("display_name"), ) @app.put("/recipes/{recipe_id}", response_model=Recipe) def update_recipe(recipe_id: int, recipe_in: RecipeUpdate, current_user: dict = Depends(get_current_user)): # Check ownership BEFORE updating (admins can edit any recipe) conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,)) recipe = cur.fetchone() if not recipe: raise HTTPException(status_code=404, detail="המתכון לא נמצא") # Allow if user is owner OR admin if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False): raise HTTPException(status_code=403, detail="אין לך הרשאה לערוך מתכון זה") finally: conn.close() data = recipe_in.dict() data["meal_type"] = data["meal_type"].lower() row = update_recipe_db(recipe_id, data) if not row: raise HTTPException(status_code=404, detail="המתכון לא נמצא") return Recipe( id=row["id"], name=row["name"], meal_type=row["meal_type"], time_minutes=row["time_minutes"], made_by=row.get("made_by"), tags=row["tags"] or [], ingredients=row["ingredients"] or [], steps=row["steps"] or [], image=row.get("image"), user_id=row.get("user_id"), owner_display_name=current_user.get("display_name"), ) @app.delete("/recipes/{recipe_id}", status_code=204) def delete_recipe(recipe_id: int, current_user: dict = Depends(get_current_user)): # Get recipe first to check ownership (admins can delete any recipe) conn = get_conn() try: with conn.cursor() as cur: cur.execute("SELECT user_id FROM recipes WHERE id = %s", (recipe_id,)) recipe = cur.fetchone() if not recipe: raise HTTPException(status_code=404, detail="המתכון לא נמצא") # Allow if user is owner OR admin if recipe["user_id"] != current_user["user_id"] and not current_user.get("is_admin", False): raise HTTPException(status_code=403, detail="אין לך הרשאה למחוק מתכון זה") finally: conn.close() deleted = delete_recipe_db(recipe_id) if not deleted: raise HTTPException(status_code=404, detail="המתכון לא נמצא") return @app.get("/recipes/random", response_model=Recipe) def random_recipe( meal_type: Optional[str] = None, max_time: Optional[int] = None, ingredients: Optional[List[str]] = Query( None, description="רשימת מרכיבים (ingredients=ביצה&ingredients=עגבניה...)", ), ): rows = get_recipes_by_filters_db(meal_type, max_time) recipes = [ Recipe( id=r["id"], name=r["name"], meal_type=r["meal_type"], time_minutes=r["time_minutes"], made_by=r.get("made_by"), tags=r["tags"] or [], ingredients=r["ingredients"] or [], steps=r["steps"] or [], image=r.get("image"), user_id=r.get("user_id"), owner_display_name=r.get("owner_display_name"), ) for r in rows ] # סינון לפי מרכיבים באפליקציה if ingredients: desired = {i.strip().lower() for i in ingredients if i.strip()} if desired: recipes = [ r for r in recipes if desired.issubset({ing.lower() for ing in r.ingredients}) ] if not recipes: raise HTTPException( status_code=404, detail="לא נמצאו מתכונים מתאימים לפילטרים שבחרת", ) return random.choice(recipes) # Authentication endpoints @app.post("/auth/register", response_model=UserResponse, status_code=201) def register(user: UserRegister): """Register a new user""" print(f"[REGISTER] Starting registration for username: {user.username}") # Check if username already exists print(f"[REGISTER] Checking if username exists...") existing_user = get_user_by_username(user.username) if existing_user: print(f"[REGISTER] Username already exists") raise HTTPException( status_code=400, detail="שם המשתמש כבר קיים במערכת" ) # Check if email already exists print(f"[REGISTER] Checking if email exists...") existing_email = get_user_by_email(user.email) if existing_email: print(f"[REGISTER] Email already exists") raise HTTPException( status_code=400, detail="האימייל כבר רשום במערכת" ) # Check if display_name already exists print(f"[REGISTER] Checking if display_name exists...") from user_db_utils import get_user_by_display_name existing_display_name = get_user_by_display_name(user.display_name) if existing_display_name: print(f"[REGISTER] Display name already exists") raise HTTPException( status_code=400, detail="שם התצוגה כבר קיים במערכת" ) # Hash password and create user print(f"[REGISTER] Hashing password...") password_hash = hash_password(user.password) print(f"[REGISTER] Creating user in database...") new_user = create_user( user.username, user.email, password_hash, user.first_name, user.last_name, user.display_name ) print(f"[REGISTER] User created successfully: {new_user['id']}") return UserResponse( id=new_user["id"], username=new_user["username"], email=new_user["email"], first_name=new_user.get("first_name"), last_name=new_user.get("last_name"), display_name=new_user["display_name"], is_admin=new_user.get("is_admin", False) ) @app.post("/auth/login", response_model=Token) def login(user: UserLogin): """Login user and return JWT token""" # Get user from database db_user = get_user_by_username(user.username) if not db_user: raise HTTPException( status_code=401, detail="שם משתמש או סיסמה שגויים" ) # Verify password if not verify_password(user.password, db_user["password_hash"]): raise HTTPException( status_code=401, detail="שם משתמש או סיסמה שגויים" ) # Create access token access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) access_token = create_access_token( data={"sub": str(db_user["id"]), "username": db_user["username"]}, expires_delta=access_token_expires ) return Token(access_token=access_token, token_type="bearer") @app.get("/auth/me", response_model=UserResponse) def get_me(current_user: dict = Depends(get_current_user)): """Get current logged-in user info""" from user_db_utils import get_user_by_id user = get_user_by_id(current_user["user_id"]) if not user: raise HTTPException(status_code=404, detail="משתמש לא נמצא") return UserResponse( id=user["id"], username=user["username"], email=user["email"], first_name=user.get("first_name"), last_name=user.get("last_name"), display_name=user["display_name"], is_admin=user.get("is_admin", False) ) # ============= Grocery Lists Endpoints ============= @app.get("/grocery-lists", response_model=List[GroceryList]) def list_grocery_lists(current_user: dict = Depends(get_current_user)): """Get all grocery lists owned by or shared with the current user""" lists = get_user_grocery_lists(current_user["user_id"]) # Convert datetime objects to strings for lst in lists: lst["created_at"] = str(lst["created_at"]) lst["updated_at"] = str(lst["updated_at"]) return lists @app.post("/grocery-lists", response_model=GroceryList, status_code=201) def create_new_grocery_list( grocery_list: GroceryListCreate, current_user: dict = Depends(get_current_user) ): """Create a new grocery list""" new_list = create_grocery_list( owner_id=current_user["user_id"], name=grocery_list.name, items=grocery_list.items ) new_list["owner_display_name"] = current_user.get("display_name") new_list["can_edit"] = True new_list["is_owner"] = True new_list["created_at"] = str(new_list["created_at"]) new_list["updated_at"] = str(new_list["updated_at"]) return new_list @app.get("/grocery-lists/{list_id}", response_model=GroceryList) def get_grocery_list(list_id: int, current_user: dict = Depends(get_current_user)): """Get a specific grocery list""" grocery_list = get_grocery_list_by_id(list_id, current_user["user_id"]) if not grocery_list: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה או שאין לך גישה אליها") grocery_list["created_at"] = str(grocery_list["created_at"]) grocery_list["updated_at"] = str(grocery_list["updated_at"]) return grocery_list @app.put("/grocery-lists/{list_id}", response_model=GroceryList) def update_grocery_list_endpoint( list_id: int, grocery_list_update: GroceryListUpdate, current_user: dict = Depends(get_current_user) ): """Update a grocery list""" # Check if user has access existing = get_grocery_list_by_id(list_id, current_user["user_id"]) if not existing: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") # Check if user has edit permission if not existing["can_edit"]: raise HTTPException(status_code=403, detail="אין לך הרשאה לערוך רשימת קניות זו") updated = update_grocery_list( list_id=list_id, name=grocery_list_update.name, items=grocery_list_update.items ) if not updated: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") # Get full details with permissions result = get_grocery_list_by_id(list_id, current_user["user_id"]) result["created_at"] = str(result["created_at"]) result["updated_at"] = str(result["updated_at"]) return result @app.delete("/grocery-lists/{list_id}", status_code=204) def delete_grocery_list_endpoint(list_id: int, current_user: dict = Depends(get_current_user)): """Delete a grocery list (owner only)""" # Check if user is owner existing = get_grocery_list_by_id(list_id, current_user["user_id"]) if not existing: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") if not existing["is_owner"]: raise HTTPException(status_code=403, detail="רק הבעלים יכול למחוק רשימת קניות") deleted = delete_grocery_list(list_id) if not deleted: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") return @app.post("/grocery-lists/{list_id}/share", response_model=GroceryListShare) def share_grocery_list_endpoint( list_id: int, share_data: ShareGroceryList, current_user: dict = Depends(get_current_user) ): """Share a grocery list with another user""" # Check if user is owner existing = get_grocery_list_by_id(list_id, current_user["user_id"]) if not existing: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") if not existing["is_owner"]: raise HTTPException(status_code=403, detail="רק הבעלים יכול לשתף רשימת קניות") # Find user by username or display_name target_user = get_user_by_username(share_data.user_identifier) if not target_user: from user_db_utils import get_user_by_display_name target_user = get_user_by_display_name(share_data.user_identifier) if not target_user: raise HTTPException(status_code=404, detail="משתמש לא נמצא") # Don't allow sharing with yourself if target_user["id"] == current_user["user_id"]: raise HTTPException(status_code=400, detail="לא ניתן לשתף רשימה עם עצמך") # Share the list share = share_grocery_list(list_id, target_user["id"], share_data.can_edit) # Create notification for the user who received the share notification_message = f"רשימת קניות '{existing['name']}' שותפה איתך על ידי {current_user['display_name']}" create_notification( user_id=target_user["id"], type="grocery_share", message=notification_message, related_id=list_id ) # Return with user details return GroceryListShare( id=share["id"], list_id=share["list_id"], shared_with_user_id=share["shared_with_user_id"], username=target_user["username"], display_name=target_user["display_name"], email=target_user["email"], can_edit=share["can_edit"], shared_at=str(share["shared_at"]) ) @app.get("/grocery-lists/{list_id}/shares", response_model=List[GroceryListShare]) def get_grocery_list_shares_endpoint( list_id: int, current_user: dict = Depends(get_current_user) ): """Get all users a grocery list is shared with""" # Check if user is owner existing = get_grocery_list_by_id(list_id, current_user["user_id"]) if not existing: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") if not existing["is_owner"]: raise HTTPException(status_code=403, detail="רק הבעלים יכול לראות את רשימת השיתופים") shares = get_grocery_list_shares(list_id) return [ GroceryListShare( id=share["id"], list_id=share["list_id"], shared_with_user_id=share["shared_with_user_id"], username=share["username"], display_name=share["display_name"], email=share["email"], can_edit=share["can_edit"], shared_at=str(share["shared_at"]) ) for share in shares ] @app.delete("/grocery-lists/{list_id}/shares/{user_id}", status_code=204) def unshare_grocery_list_endpoint( list_id: int, user_id: int, current_user: dict = Depends(get_current_user) ): """Remove sharing access for a user""" # Check if user is owner existing = get_grocery_list_by_id(list_id, current_user["user_id"]) if not existing: raise HTTPException(status_code=404, detail="רשימת קניות לא נמצאה") if not existing["is_owner"]: raise HTTPException(status_code=403, detail="רק הבעלים יכול להסיר שיתופים") deleted = unshare_grocery_list(list_id, user_id) if not deleted: raise HTTPException(status_code=404, detail="שיתוף לא נמצא") return @app.get("/users/search", response_model=List[UserSearch]) def search_users_endpoint( q: str = Query(..., min_length=1, description="Search query for username or display name"), current_user: dict = Depends(get_current_user) ): """Search users by username or display name for autocomplete""" users = search_users(q) return [ UserSearch( id=user["id"], username=user["username"], display_name=user["display_name"], email=user["email"] ) for user in users ] # =========================== # Notification Endpoints # =========================== @app.get("/notifications", response_model=List[Notification]) def get_notifications_endpoint( unread_only: bool = Query(False, description="Get only unread notifications"), current_user: dict = Depends(get_current_user) ): """Get all notifications for the current user""" notifications = get_user_notifications(current_user["user_id"], unread_only) return [ Notification( id=notif["id"], user_id=notif["user_id"], type=notif["type"], message=notif["message"], related_id=notif["related_id"], is_read=notif["is_read"], created_at=str(notif["created_at"]) ) for notif in notifications ] @app.patch("/notifications/{notification_id}/read") def mark_notification_read_endpoint( notification_id: int, current_user: dict = Depends(get_current_user) ): """Mark a notification as read""" mark_notification_as_read(notification_id, current_user["user_id"]) return {"message": "Notification marked as read"} @app.patch("/notifications/read-all") def mark_all_notifications_read_endpoint( current_user: dict = Depends(get_current_user) ): """Mark all notifications as read""" mark_all_notifications_as_read(current_user["user_id"]) return {"message": "All notifications marked as read"} @app.delete("/notifications/{notification_id}") def delete_notification_endpoint( notification_id: int, current_user: dict = Depends(get_current_user) ): """Delete a notification""" delete_notification(notification_id, current_user["user_id"]) return {"message": "Notification deleted"} if __name__ == "__main__": uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)