my-recipes/backend/social_db_utils.py
2025-12-19 16:17:17 +02:00

203 lines
6.6 KiB
Python

import os
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2 import errors
def get_db_connection():
"""Get database connection"""
return psycopg2.connect(
host=os.getenv("DB_HOST", "localhost"),
port=int(os.getenv("DB_PORT", "5432")),
database=os.getenv("DB_NAME", "recipes_db"),
user=os.getenv("DB_USER", "recipes_user"),
password=os.getenv("DB_PASSWORD", "recipes_password"),
)
# ============= Friends System =============
def send_friend_request(sender_id: int, receiver_id: int):
"""Send a friend request"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Check if already friends
cur.execute(
"SELECT 1 FROM friendships WHERE (user_id = %s AND friend_id = %s) OR (user_id = %s AND friend_id = %s)",
(sender_id, receiver_id, receiver_id, sender_id)
)
if cur.fetchone():
return {"error": "Already friends"}
# Check if request already exists
cur.execute(
"SELECT * FROM friend_requests WHERE sender_id = %s AND receiver_id = %s AND status = 'pending'",
(sender_id, receiver_id)
)
existing = cur.fetchone()
if existing:
return dict(existing)
try:
cur.execute(
"""
INSERT INTO friend_requests (sender_id, receiver_id)
VALUES (%s, %s)
RETURNING id, sender_id, receiver_id, status, created_at
""",
(sender_id, receiver_id)
)
request = cur.fetchone()
conn.commit()
return dict(request)
except errors.UniqueViolation:
# Request already exists, fetch and return it
conn.rollback()
cur.execute(
"SELECT id, sender_id, receiver_id, status, created_at FROM friend_requests WHERE sender_id = %s AND receiver_id = %s",
(sender_id, receiver_id)
)
existing_request = cur.fetchone()
if existing_request:
return dict(existing_request)
return {"error": "Friend request already exists"}
finally:
cur.close()
conn.close()
def accept_friend_request(request_id: int):
"""Accept a friend request and create friendship"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
# Get request details
cur.execute(
"SELECT sender_id, receiver_id FROM friend_requests WHERE id = %s AND status = 'pending'",
(request_id,)
)
request = cur.fetchone()
if not request:
return {"error": "Request not found or already processed"}
sender_id = request["sender_id"]
receiver_id = request["receiver_id"]
# Create bidirectional friendship
cur.execute(
"INSERT INTO friendships (user_id, friend_id) VALUES (%s, %s), (%s, %s) ON CONFLICT DO NOTHING",
(sender_id, receiver_id, receiver_id, sender_id)
)
# Update request status
cur.execute(
"UPDATE friend_requests SET status = 'accepted', updated_at = CURRENT_TIMESTAMP WHERE id = %s",
(request_id,)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
def reject_friend_request(request_id: int):
"""Reject a friend request"""
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute(
"UPDATE friend_requests SET status = 'rejected', updated_at = CURRENT_TIMESTAMP WHERE id = %s AND status = 'pending'",
(request_id,)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
def get_friend_requests(user_id: int):
"""Get pending friend requests for a user"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
SELECT fr.id AS request_id, fr.sender_id, fr.receiver_id, fr.status, fr.created_at,
u.username AS sender_username, u.display_name AS sender_display_name, u.email AS sender_email
FROM friend_requests fr
JOIN users u ON u.id = fr.sender_id
WHERE fr.receiver_id = %s AND fr.status = 'pending'
ORDER BY fr.created_at DESC
""",
(user_id,)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()
def get_friends(user_id: int):
"""Get list of user's friends"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
cur.execute(
"""
SELECT u.id, u.username, u.display_name, u.email, f.created_at AS friends_since
FROM friendships f
JOIN users u ON u.id = f.friend_id
WHERE f.user_id = %s
ORDER BY u.display_name
""",
(user_id,)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()
def remove_friend(user_id: int, friend_id: int):
"""Remove a friend"""
conn = get_db_connection()
cur = conn.cursor()
try:
cur.execute(
"DELETE FROM friendships WHERE (user_id = %s AND friend_id = %s) OR (user_id = %s AND friend_id = %s)",
(user_id, friend_id, friend_id, user_id)
)
conn.commit()
return {"success": True}
finally:
cur.close()
conn.close()
def search_users(query: str, current_user_id: int, limit: int = 20):
"""Search for users by username or display name"""
conn = get_db_connection()
cur = conn.cursor(cursor_factory=RealDictCursor)
try:
search_pattern = f"%{query}%"
cur.execute(
"""
SELECT u.id, u.username, u.display_name, u.email,
EXISTS(SELECT 1 FROM friendships WHERE user_id = %s AND friend_id = u.id) AS is_friend,
EXISTS(SELECT 1 FROM friend_requests WHERE sender_id = %s AND receiver_id = u.id AND status = 'pending') AS request_sent
FROM users u
WHERE (u.username ILIKE %s OR u.display_name ILIKE %s) AND u.id != %s
ORDER BY u.display_name
LIMIT %s
""",
(current_user_id, current_user_id, search_pattern, search_pattern, current_user_id, limit)
)
return [dict(row) for row in cur.fetchall()]
finally:
cur.close()
conn.close()