125 lines
2.9 KiB
Python
125 lines
2.9 KiB
Python
"""
|
|
Database utilities for managing notifications.
|
|
"""
|
|
|
|
from db_utils import get_conn
|
|
|
|
|
|
def create_notification(user_id: int, type: str, message: str, related_id: int = None):
|
|
"""Create a new notification for a user."""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO notifications (user_id, type, message, related_id)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id, user_id, type, message, related_id, is_read, created_at
|
|
""",
|
|
(user_id, type, message, related_id)
|
|
)
|
|
row = cur.fetchone()
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
if row:
|
|
return {
|
|
"id": row["id"],
|
|
"user_id": row["user_id"],
|
|
"type": row["type"],
|
|
"message": row["message"],
|
|
"related_id": row["related_id"],
|
|
"is_read": row["is_read"],
|
|
"created_at": row["created_at"]
|
|
}
|
|
return None
|
|
|
|
|
|
def get_user_notifications(user_id: int, unread_only: bool = False):
|
|
"""Get all notifications for a user."""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
|
|
query = """
|
|
SELECT id, user_id, type, message, related_id, is_read, created_at
|
|
FROM notifications
|
|
WHERE user_id = %s
|
|
"""
|
|
|
|
if unread_only:
|
|
query += " AND is_read = FALSE"
|
|
|
|
query += " ORDER BY created_at DESC"
|
|
|
|
cur.execute(query, (user_id,))
|
|
rows = cur.fetchall()
|
|
cur.close()
|
|
conn.close()
|
|
|
|
notifications = []
|
|
for row in rows:
|
|
notifications.append({
|
|
"id": row["id"],
|
|
"user_id": row["user_id"],
|
|
"type": row["type"],
|
|
"message": row["message"],
|
|
"related_id": row["related_id"],
|
|
"is_read": row["is_read"],
|
|
"created_at": row["created_at"]
|
|
})
|
|
|
|
return notifications
|
|
|
|
|
|
def mark_notification_as_read(notification_id: int, user_id: int):
|
|
"""Mark a notification as read."""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
UPDATE notifications
|
|
SET is_read = TRUE
|
|
WHERE id = %s AND user_id = %s
|
|
""",
|
|
(notification_id, user_id)
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
return True
|
|
|
|
|
|
def mark_all_notifications_as_read(user_id: int):
|
|
"""Mark all notifications for a user as read."""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
UPDATE notifications
|
|
SET is_read = TRUE
|
|
WHERE user_id = %s AND is_read = FALSE
|
|
""",
|
|
(user_id,)
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
return True
|
|
|
|
|
|
def delete_notification(notification_id: int, user_id: int):
|
|
"""Delete a notification."""
|
|
conn = get_conn()
|
|
cur = conn.cursor()
|
|
cur.execute(
|
|
"""
|
|
DELETE FROM notifications
|
|
WHERE id = %s AND user_id = %s
|
|
""",
|
|
(notification_id, user_id)
|
|
)
|
|
conn.commit()
|
|
cur.close()
|
|
conn.close()
|
|
return True
|