All checks were successful
ci/woodpecker/push/woodpecker Pipeline was successful
106 lines
3.6 KiB
Python
106 lines
3.6 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from app.database.database import get_db
|
|
from app.models import ContactMessage, User
|
|
from app.schemas.contact import ContactMessageCreate, ContactMessageResponse, ContactMessageUpdate
|
|
from app.services.auth import get_current_admin_user
|
|
|
|
router = APIRouter(prefix="/api/contact", tags=["contact"])
|
|
|
|
|
|
@router.post("", response_model=ContactMessageResponse)
|
|
def send_contact_message(message: ContactMessageCreate, db: Session = Depends(get_db)):
|
|
"""Public endpoint - anyone can send a contact message"""
|
|
message_data = message.model_dump() if hasattr(message, 'model_dump') else message.dict()
|
|
db_message = ContactMessage(**message_data)
|
|
db.add(db_message)
|
|
db.commit()
|
|
db.refresh(db_message)
|
|
return db_message
|
|
|
|
|
|
# Admin endpoints
|
|
admin_router = APIRouter(prefix="/api/admin/contact-messages", tags=["admin-contact"])
|
|
|
|
|
|
@admin_router.get("", response_model=List[ContactMessageResponse])
|
|
def get_all_messages(
|
|
status: Optional[str] = None,
|
|
is_read: Optional[bool] = None,
|
|
skip: int = 0,
|
|
limit: int = 100,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get all contact messages with optional filters"""
|
|
query = db.query(ContactMessage)
|
|
|
|
if status:
|
|
query = query.filter(ContactMessage.status == status)
|
|
if is_read is not None:
|
|
query = query.filter(ContactMessage.is_read == is_read)
|
|
|
|
messages = query.order_by(ContactMessage.created_at.desc()).offset(skip).limit(limit).all()
|
|
return messages
|
|
|
|
|
|
@admin_router.get("/unread-count")
|
|
def get_unread_count(
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get count of unread messages"""
|
|
count = db.query(ContactMessage).filter(ContactMessage.is_read == False).count()
|
|
return {"unread_count": count}
|
|
|
|
|
|
@admin_router.get("/{message_id}", response_model=ContactMessageResponse)
|
|
def get_message(
|
|
message_id: int,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Get a single contact message by ID"""
|
|
message = db.query(ContactMessage).filter(ContactMessage.id == message_id).first()
|
|
if not message:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
return message
|
|
|
|
|
|
@admin_router.put("/{message_id}", response_model=ContactMessageResponse)
|
|
def update_message(
|
|
message_id: int,
|
|
message_update: ContactMessageUpdate,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Update contact message status/notes"""
|
|
message = db.query(ContactMessage).filter(ContactMessage.id == message_id).first()
|
|
if not message:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
update_data = message_update.model_dump(exclude_unset=True) if hasattr(message_update, 'model_dump') else message_update.dict(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(message, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(message)
|
|
return message
|
|
|
|
|
|
@admin_router.delete("/{message_id}")
|
|
def delete_message(
|
|
message_id: int,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Delete a contact message"""
|
|
message = db.query(ContactMessage).filter(ContactMessage.id == message_id).first()
|
|
if not message:
|
|
raise HTTPException(status_code=404, detail="Message not found")
|
|
|
|
db.delete(message)
|
|
db.commit()
|
|
return {"message": "Contact message deleted successfully"}
|