76 lines
2.6 KiB
Python
76 lines
2.6 KiB
Python
import os
|
|
import uuid
|
|
from app.db import get_db_connection
|
|
from app.config import settings
|
|
from app.schemas import PhotoResponse, PhotoUploadResponse
|
|
|
|
class PhotoService:
|
|
"""Handle photo uploads and management"""
|
|
|
|
@staticmethod
|
|
def upload_photo(profile_id: int, file_content: bytes, filename: str) -> PhotoUploadResponse:
|
|
"""Upload and save a photo"""
|
|
os.makedirs(settings.media_dir, exist_ok=True)
|
|
|
|
# Generate unique filename
|
|
unique_id = str(uuid.uuid4())
|
|
file_ext = os.path.splitext(filename)[1]
|
|
new_filename = f"{unique_id}{file_ext}"
|
|
file_path = os.path.join(settings.media_dir, new_filename)
|
|
|
|
# Save file
|
|
with open(file_path, "wb") as f:
|
|
f.write(file_content)
|
|
|
|
# Save metadata in DB
|
|
with get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
|
|
# Get next display order
|
|
cur.execute("SELECT MAX(display_order) FROM photos WHERE profile_id = %s", (profile_id,))
|
|
max_order = cur.fetchone()[0] or 0
|
|
|
|
cur.execute(
|
|
"INSERT INTO photos (profile_id, file_path, display_order) VALUES (%s, %s, %s) RETURNING id",
|
|
(profile_id, new_filename, max_order + 1)
|
|
)
|
|
photo_id = cur.fetchone()[0]
|
|
conn.commit()
|
|
|
|
return PhotoUploadResponse(
|
|
id=photo_id,
|
|
profile_id=profile_id,
|
|
file_path=new_filename,
|
|
message="Photo uploaded successfully"
|
|
)
|
|
|
|
@staticmethod
|
|
def get_photo(photo_id: int) -> str:
|
|
"""Get photo file path by ID"""
|
|
with get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT file_path FROM photos WHERE id = %s", (photo_id,))
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
@staticmethod
|
|
def delete_photo(photo_id: int, profile_id: int) -> bool:
|
|
"""Delete a photo"""
|
|
with get_db_connection() as conn:
|
|
cur = conn.cursor()
|
|
cur.execute("SELECT file_path FROM photos WHERE id = %s AND profile_id = %s", (photo_id, profile_id))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
return False
|
|
|
|
# Delete file
|
|
file_path = os.path.join(settings.media_dir, row[0])
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Delete from DB
|
|
cur.execute("DELETE FROM photos WHERE id = %s AND profile_id = %s", (photo_id, profile_id))
|
|
conn.commit()
|
|
return True
|