my-recipes/backend/groups_db_utils.py

381 lines
12 KiB
Python

import os
import psycopg2
from psycopg2.extras import RealDictCursor
from typing import List, Optional
def get_db_connection():
"""Get database connection"""
return psycopg2.connect(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "recipes_db"),
user=os.getenv("DB_USER", "recipes_user"),
password=os.getenv("DB_PASSWORD", "recipes_password"),
)
# ============= Groups =============
def create_group(name: str, description: str, created_by: int, is_private: bool = False):
"""Create a new group"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
INSERT INTO groups (name, description, created_by, is_private)
VALUES (%s, %s, %s, %s)
RETURNING id, name, description, created_by, is_private, created_at
""",
(name, description, created_by, is_private)
)
group = dict(cur.fetchone())
# Add creator as admin
cur.execute(
"INSERT INTO group_members (group_id, user_id, role) VALUES (%s, %s, 'admin')",
(group["id"], created_by)
)
conn.commit()
return group
finally:
cur.close()
conn.close()
def get_group(group_id: int):
"""Get group details"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"SELECT id, name, description, created_by, is_private, created_at FROM groups WHERE id = %s",
(group_id,)
)
group = cur.fetchone()
if not group:
return None
# Get members
cur.execute(
"""
SELECT gm.role, gm.joined_at, u.id, u.username, u.display_name
FROM group_members gm
JOIN users u ON u.id = gm.user_id
WHERE gm.group_id = %s
ORDER BY gm.role, u.display_name
""",
(group_id,)
)
members = [dict(row) for row in cur.fetchall()]
result = dict(group)
result["members"] = members
result["member_count"] = len(members)
return result
finally:
cur.close()
conn.close()
def get_user_groups(user_id: int):
"""Get all groups user is member of"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
SELECT g.id, g.name, g.description, g.is_private, g.created_at, gm.role,
(SELECT COUNT(*) FROM group_members WHERE group_id = g.id) AS member_count
FROM groups g
JOIN group_members gm ON g.id = gm.group_id
WHERE gm.user_id = %s
ORDER BY g.name
""",
(user_id,)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()
def add_group_member(group_id: int, user_id: int, added_by: int):
"""Add a member to a group"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Check if added_by is admin/moderator
cur.execute(
"SELECT role FROM group_members WHERE group_id = %s AND user_id = %s",
(group_id, added_by)
)
adder = cur.fetchone()
if not adder or adder["role"] not in ["admin", "moderator"]:
return {"error": "Only admins and moderators can add members"}
cur.execute(
"""
INSERT INTO group_members (group_id, user_id, role)
VALUES (%s, %s, 'member')
ON CONFLICT (group_id, user_id) DO NOTHING
RETURNING id
""",
(group_id, user_id)
)
result = cur.fetchone()
conn.commit()
if result:
return {"success": True}
else:
return {"error": "User is already a member"}
finally:
cur.close()
conn.close()
def remove_group_member(group_id: int, user_id: int, removed_by: int):
"""Remove a member from a group"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Check permissions
cur.execute(
"SELECT role FROM group_members WHERE group_id = %s AND user_id = %s",
(group_id, removed_by)
)
remover = cur.fetchone()
# User can remove themselves, or admins/moderators can remove others
if removed_by != user_id:
if not remover or remover["role"] not in ["admin", "moderator"]:
return {"error": "Only admins and moderators can remove members"}
cur.execute(
"DELETE FROM group_members WHERE group_id = %s AND user_id = %s",
(group_id, user_id)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
# ============= Recipe Ratings & Comments =============
def add_or_update_rating(recipe_id: int, user_id: int, rating: int):
"""Add or update a rating for a recipe"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
INSERT INTO recipe_ratings (recipe_id, user_id, rating)
VALUES (%s, %s, %s)
ON CONFLICT (recipe_id, user_id)
DO UPDATE SET rating = EXCLUDED.rating, updated_at = CURRENT_TIMESTAMP
RETURNING id, recipe_id, user_id, rating, created_at, updated_at
""",
(recipe_id, user_id, rating)
)
result = cur.fetchone()
conn.commit()
return dict(result)
finally:
cur.close()
conn.close()
def get_recipe_rating_stats(recipe_id: int):
"""Get rating statistics for a recipe"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
SELECT
COUNT(*) as rating_count,
AVG(rating)::DECIMAL(3,2) as average_rating,
COUNT(CASE WHEN rating = 5 THEN 1 END) as five_star,
COUNT(CASE WHEN rating = 4 THEN 1 END) as four_star,
COUNT(CASE WHEN rating = 3 THEN 1 END) as three_star,
COUNT(CASE WHEN rating = 2 THEN 1 END) as two_star,
COUNT(CASE WHEN rating = 1 THEN 1 END) as one_star
FROM recipe_ratings
WHERE recipe_id = %s
""",
(recipe_id,)
)
return dict(cur.fetchone())
finally:
cur.close()
conn.close()
def get_user_recipe_rating(recipe_id: int, user_id: int):
"""Get user's rating for a recipe"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"SELECT rating FROM recipe_ratings WHERE recipe_id = %s AND user_id = %s",
(recipe_id, user_id)
)
result = cur.fetchone()
return dict(result) if result else None
finally:
cur.close()
conn.close()
def add_comment(recipe_id: int, user_id: int, content: str, parent_comment_id: Optional[int] = None):
"""Add a comment to a recipe"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
INSERT INTO recipe_comments (recipe_id, user_id, content, parent_comment_id)
VALUES (%s, %s, %s, %s)
RETURNING id, recipe_id, user_id, content, parent_comment_id, created_at
""",
(recipe_id, user_id, content, parent_comment_id)
)
result = cur.fetchone()
conn.commit()
return dict(result)
finally:
cur.close()
conn.close()
def get_recipe_comments(recipe_id: int):
"""Get all comments for a recipe"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
SELECT c.id, c.content, c.parent_comment_id, c.created_at, c.updated_at,
u.id AS user_id, u.username, u.display_name
FROM recipe_comments c
JOIN users u ON u.id = c.user_id
WHERE c.recipe_id = %s AND c.is_deleted = FALSE
ORDER BY c.created_at ASC
""",
(recipe_id,)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()
def update_comment(comment_id: int, user_id: int, content: str):
"""Update a comment"""
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute(
"""
UPDATE recipe_comments
SET content = %s, updated_at = CURRENT_TIMESTAMP
WHERE id = %s AND user_id = %s AND is_deleted = FALSE
""",
(content, comment_id, user_id)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
def delete_comment(comment_id: int, user_id: int):
"""Soft delete a comment"""
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute(
"UPDATE recipe_comments SET is_deleted = TRUE WHERE id = %s AND user_id = %s",
(comment_id, user_id)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
# ============= Recipe Shares to Groups =============
def share_recipe_to_group(recipe_id: int, group_id: int, user_id: int):
"""Share a recipe to a group"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Check if user is member of group
cur.execute(
"SELECT 1 FROM group_members WHERE group_id = %s AND user_id = %s",
(group_id, user_id)
)
if not cur.fetchone():
return {"error": "Not a member of this group"}
cur.execute(
"""
INSERT INTO recipe_shares (recipe_id, group_id, shared_by)
VALUES (%s, %s, %s)
ON CONFLICT (recipe_id, group_id) DO NOTHING
RETURNING id
""",
(recipe_id, group_id, user_id)
)
result = cur.fetchone()
conn.commit()
if result:
return {"success": True}
else:
return {"error": "Recipe already shared to this group"}
finally:
cur.close()
conn.close()
def get_group_recipes(group_id: int, user_id: int):
"""Get all recipes shared to a group"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Verify user is member
cur.execute(
"SELECT 1 FROM group_members WHERE group_id = %s AND user_id = %s",
(group_id, user_id)
)
if not cur.fetchone():
return {"error": "Not a member of this group"}
cur.execute(
"""
SELECT r.*, u.username AS owner_username, u.display_name AS owner_display_name,
rs.shared_at, rs.shared_by,
u2.username AS shared_by_username, u2.display_name AS shared_by_display_name
FROM recipe_shares rs
JOIN recipes r ON r.id = rs.recipe_id
JOIN users u ON u.id = r.user_id
JOIN users u2 ON u2.id = rs.shared_by
WHERE rs.group_id = %s
ORDER BY rs.shared_at DESC
""",
(group_id,)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()