import os import psycopg2 from psycopg2.extras import RealDictCursor from typing import List, Optional def get_db_connection(): """Get database connection""" return psycopg2.connect( host=os.getenv("DB_HOST", "localhost"), port=int(os.getenv("DB_PORT", "5432")), database=os.getenv("DB_NAME", "recipes_db"), user=os.getenv("DB_USER", "recipes_user"), password=os.getenv("DB_PASSWORD", "recipes_password"), ) # ============= Groups ============= def create_group(name: str, description: str, created_by: int, is_private: bool = False): """Create a new group""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ INSERT INTO groups (name, description, created_by, is_private) VALUES (%s, %s, %s, %s) RETURNING id, name, description, created_by, is_private, created_at """, (name, description, created_by, is_private) ) group = dict(cur.fetchone()) # Add creator as admin cur.execute( "INSERT INTO group_members (group_id, user_id, role) VALUES (%s, %s, 'admin')", (group["id"], created_by) ) conn.commit() return group finally: cur.close() conn.close() def get_group(group_id: int): """Get group details""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( "SELECT id, name, description, created_by, is_private, created_at FROM groups WHERE id = %s", (group_id,) ) group = cur.fetchone() if not group: return None # Get members cur.execute( """ SELECT gm.role, gm.joined_at, u.id, u.username, u.display_name FROM group_members gm JOIN users u ON u.id = gm.user_id WHERE gm.group_id = %s ORDER BY gm.role, u.display_name """, (group_id,) ) members = [dict(row) for row in cur.fetchall()] result = dict(group) result["members"] = members result["member_count"] = len(members) return result finally: cur.close() conn.close() def get_user_groups(user_id: int): """Get all groups user is member of""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ SELECT g.id, g.name, g.description, g.is_private, g.created_at, gm.role, (SELECT COUNT(*) FROM group_members WHERE group_id = g.id) AS member_count FROM groups g JOIN group_members gm ON g.id = gm.group_id WHERE gm.user_id = %s ORDER BY g.name """, (user_id,) ) return [dict(row) for row in cur.fetchall()] finally: cur.close() conn.close() def add_group_member(group_id: int, user_id: int, added_by: int): """Add a member to a group""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: # Check if added_by is admin/moderator cur.execute( "SELECT role FROM group_members WHERE group_id = %s AND user_id = %s", (group_id, added_by) ) adder = cur.fetchone() if not adder or adder["role"] not in ["admin", "moderator"]: return {"error": "Only admins and moderators can add members"} cur.execute( """ INSERT INTO group_members (group_id, user_id, role) VALUES (%s, %s, 'member') ON CONFLICT (group_id, user_id) DO NOTHING RETURNING id """, (group_id, user_id) ) result = cur.fetchone() conn.commit() if result: return {"success": True} else: return {"error": "User is already a member"} finally: cur.close() conn.close() def remove_group_member(group_id: int, user_id: int, removed_by: int): """Remove a member from a group""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: # Check permissions cur.execute( "SELECT role FROM group_members WHERE group_id = %s AND user_id = %s", (group_id, removed_by) ) remover = cur.fetchone() # User can remove themselves, or admins/moderators can remove others if removed_by != user_id: if not remover or remover["role"] not in ["admin", "moderator"]: return {"error": "Only admins and moderators can remove members"} cur.execute( "DELETE FROM group_members WHERE group_id = %s AND user_id = %s", (group_id, user_id) ) conn.commit() return {"success": True} finally: cur.close() conn.close() # ============= Recipe Ratings & Comments ============= def add_or_update_rating(recipe_id: int, user_id: int, rating: int): """Add or update a rating for a recipe""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ INSERT INTO recipe_ratings (recipe_id, user_id, rating) VALUES (%s, %s, %s) ON CONFLICT (recipe_id, user_id) DO UPDATE SET rating = EXCLUDED.rating, updated_at = CURRENT_TIMESTAMP RETURNING id, recipe_id, user_id, rating, created_at, updated_at """, (recipe_id, user_id, rating) ) result = cur.fetchone() conn.commit() return dict(result) finally: cur.close() conn.close() def get_recipe_rating_stats(recipe_id: int): """Get rating statistics for a recipe""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ SELECT COUNT(*) as rating_count, AVG(rating)::DECIMAL(3,2) as average_rating, COUNT(CASE WHEN rating = 5 THEN 1 END) as five_star, COUNT(CASE WHEN rating = 4 THEN 1 END) as four_star, COUNT(CASE WHEN rating = 3 THEN 1 END) as three_star, COUNT(CASE WHEN rating = 2 THEN 1 END) as two_star, COUNT(CASE WHEN rating = 1 THEN 1 END) as one_star FROM recipe_ratings WHERE recipe_id = %s """, (recipe_id,) ) return dict(cur.fetchone()) finally: cur.close() conn.close() def get_user_recipe_rating(recipe_id: int, user_id: int): """Get user's rating for a recipe""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( "SELECT rating FROM recipe_ratings WHERE recipe_id = %s AND user_id = %s", (recipe_id, user_id) ) result = cur.fetchone() return dict(result) if result else None finally: cur.close() conn.close() def add_comment(recipe_id: int, user_id: int, content: str, parent_comment_id: Optional[int] = None): """Add a comment to a recipe""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ INSERT INTO recipe_comments (recipe_id, user_id, content, parent_comment_id) VALUES (%s, %s, %s, %s) RETURNING id, recipe_id, user_id, content, parent_comment_id, created_at """, (recipe_id, user_id, content, parent_comment_id) ) result = cur.fetchone() conn.commit() return dict(result) finally: cur.close() conn.close() def get_recipe_comments(recipe_id: int): """Get all comments for a recipe""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: cur.execute( """ SELECT c.id, c.content, c.parent_comment_id, c.created_at, c.updated_at, u.id AS user_id, u.username, u.display_name FROM recipe_comments c JOIN users u ON u.id = c.user_id WHERE c.recipe_id = %s AND c.is_deleted = FALSE ORDER BY c.created_at ASC """, (recipe_id,) ) return [dict(row) for row in cur.fetchall()] finally: cur.close() conn.close() def update_comment(comment_id: int, user_id: int, content: str): """Update a comment""" conn = get_db_connection() cur = conn.cursor() try: cur.execute( """ UPDATE recipe_comments SET content = %s, updated_at = CURRENT_TIMESTAMP WHERE id = %s AND user_id = %s AND is_deleted = FALSE """, (content, comment_id, user_id) ) conn.commit() return {"success": True} finally: cur.close() conn.close() def delete_comment(comment_id: int, user_id: int): """Soft delete a comment""" conn = get_db_connection() cur = conn.cursor() try: cur.execute( "UPDATE recipe_comments SET is_deleted = TRUE WHERE id = %s AND user_id = %s", (comment_id, user_id) ) conn.commit() return {"success": True} finally: cur.close() conn.close() # ============= Recipe Shares to Groups ============= def share_recipe_to_group(recipe_id: int, group_id: int, user_id: int): """Share a recipe to a group""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: # Check if user is member of group cur.execute( "SELECT 1 FROM group_members WHERE group_id = %s AND user_id = %s", (group_id, user_id) ) if not cur.fetchone(): return {"error": "Not a member of this group"} cur.execute( """ INSERT INTO recipe_shares (recipe_id, group_id, shared_by) VALUES (%s, %s, %s) ON CONFLICT (recipe_id, group_id) DO NOTHING RETURNING id """, (recipe_id, group_id, user_id) ) result = cur.fetchone() conn.commit() if result: return {"success": True} else: return {"error": "Recipe already shared to this group"} finally: cur.close() conn.close() def get_group_recipes(group_id: int, user_id: int): """Get all recipes shared to a group""" conn = get_db_connection() cur = conn.cursor(cursor_factory=RealDictCursor) try: # Verify user is member cur.execute( "SELECT 1 FROM group_members WHERE group_id = %s AND user_id = %s", (group_id, user_id) ) if not cur.fetchone(): return {"error": "Not a member of this group"} cur.execute( """ SELECT r.*, u.username AS owner_username, u.display_name AS owner_display_name, rs.shared_at, rs.shared_by, u2.username AS shared_by_username, u2.display_name AS shared_by_display_name FROM recipe_shares rs JOIN recipes r ON r.id = rs.recipe_id JOIN users u ON u.id = r.user_id JOIN users u2 ON u2.id = rs.shared_by WHERE rs.group_id = %s ORDER BY rs.shared_at DESC """, (group_id,) ) return [dict(row) for row in cur.fetchall()] finally: cur.close() conn.close()