111 lines
3.3 KiB
Python
111 lines
3.3 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
import models
|
|
import schemas
|
|
|
|
|
|
def get_guest(db: Session, guest_id: int):
|
|
return db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
|
|
|
|
def get_guests(db: Session, skip: int = 0, limit: int = 100):
|
|
return db.query(models.Guest).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_guest(db: Session, guest: schemas.GuestCreate):
|
|
db_guest = models.Guest(**guest.model_dump())
|
|
db.add(db_guest)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def update_guest(db: Session, guest_id: int, guest: schemas.GuestUpdate):
|
|
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
if db_guest:
|
|
update_data = guest.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_guest, field, value)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def delete_guest(db: Session, guest_id: int):
|
|
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
if db_guest:
|
|
db.delete(db_guest)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def search_guests(
|
|
db: Session,
|
|
query: str = "",
|
|
rsvp_status: str = None,
|
|
meal_preference: str = None,
|
|
owner: str = None
|
|
):
|
|
db_query = db.query(models.Guest)
|
|
|
|
# Search by name, email, or phone
|
|
if query:
|
|
search_pattern = f"%{query}%"
|
|
db_query = db_query.filter(
|
|
or_(
|
|
models.Guest.first_name.ilike(search_pattern),
|
|
models.Guest.last_name.ilike(search_pattern),
|
|
models.Guest.email.ilike(search_pattern),
|
|
models.Guest.phone_number.ilike(search_pattern)
|
|
)
|
|
)
|
|
|
|
# Filter by RSVP status
|
|
if rsvp_status:
|
|
db_query = db_query.filter(models.Guest.rsvp_status == rsvp_status)
|
|
|
|
# Filter by meal preference
|
|
if meal_preference:
|
|
db_query = db_query.filter(models.Guest.meal_preference == meal_preference)
|
|
|
|
# Filter by owner
|
|
if owner:
|
|
db_query = db_query.filter(models.Guest.owner == owner)
|
|
|
|
return db_query.all()
|
|
|
|
|
|
def delete_guests_bulk(db: Session, guest_ids: list[int]):
|
|
"""Delete multiple guests by their IDs"""
|
|
deleted_count = db.query(models.Guest).filter(models.Guest.id.in_(guest_ids)).delete(synchronize_session=False)
|
|
db.commit()
|
|
return deleted_count
|
|
|
|
|
|
def delete_guests_by_owner(db: Session, owner: str):
|
|
"""Delete all guests by owner (for undo import)"""
|
|
# Delete guests where owner matches exactly or is in comma-separated list
|
|
deleted_count = db.query(models.Guest).filter(
|
|
or_(
|
|
models.Guest.owner == owner,
|
|
models.Guest.owner.like(f"{owner},%"),
|
|
models.Guest.owner.like(f"%,{owner},%"),
|
|
models.Guest.owner.like(f"%,{owner}")
|
|
)
|
|
).delete(synchronize_session=False)
|
|
db.commit()
|
|
return deleted_count
|
|
|
|
|
|
def get_unique_owners(db: Session):
|
|
"""Get list of unique owner emails"""
|
|
results = db.query(models.Guest.owner).distinct().filter(models.Guest.owner.isnot(None)).all()
|
|
owners = set()
|
|
for result in results:
|
|
if result[0]:
|
|
# Split comma-separated owners
|
|
for owner in result[0].split(','):
|
|
owners.add(owner.strip())
|
|
return sorted(list(owners))
|