""" Database utilities for managing notifications. """ from db_utils import get_conn def create_notification(user_id: int, type: str, message: str, related_id: int = None): """Create a new notification for a user.""" conn = get_conn() cur = conn.cursor() cur.execute( """ INSERT INTO notifications (user_id, type, message, related_id) VALUES (%s, %s, %s, %s) RETURNING id, user_id, type, message, related_id, is_read, created_at """, (user_id, type, message, related_id) ) row = cur.fetchone() conn.commit() cur.close() conn.close() if row: return { "id": row["id"], "user_id": row["user_id"], "type": row["type"], "message": row["message"], "related_id": row["related_id"], "is_read": row["is_read"], "created_at": row["created_at"] } return None def get_user_notifications(user_id: int, unread_only: bool = False): """Get all notifications for a user.""" conn = get_conn() cur = conn.cursor() query = """ SELECT id, user_id, type, message, related_id, is_read, created_at FROM notifications WHERE user_id = %s """ if unread_only: query += " AND is_read = FALSE" query += " ORDER BY created_at DESC" cur.execute(query, (user_id,)) rows = cur.fetchall() cur.close() conn.close() notifications = [] for row in rows: notifications.append({ "id": row["id"], "user_id": row["user_id"], "type": row["type"], "message": row["message"], "related_id": row["related_id"], "is_read": row["is_read"], "created_at": row["created_at"] }) return notifications def mark_notification_as_read(notification_id: int, user_id: int): """Mark a notification as read.""" conn = get_conn() cur = conn.cursor() cur.execute( """ UPDATE notifications SET is_read = TRUE WHERE id = %s AND user_id = %s """, (notification_id, user_id) ) conn.commit() cur.close() conn.close() return True def mark_all_notifications_as_read(user_id: int): """Mark all notifications for a user as read.""" conn = get_conn() cur = conn.cursor() cur.execute( """ UPDATE notifications SET is_read = TRUE WHERE user_id = %s AND is_read = FALSE """, (user_id,) ) conn.commit() cur.close() conn.close() return True def delete_notification(notification_id: int, user_id: int): """Delete a notification.""" conn = get_conn() cur = conn.cursor() cur.execute( """ DELETE FROM notifications WHERE id = %s AND user_id = %s """, (notification_id, user_id) ) conn.commit() cur.close() conn.close() return True