from sqlalchemy.orm import Session from sqlalchemy import or_ import models import schemas def get_guest(db: Session, guest_id: int): return db.query(models.Guest).filter(models.Guest.id == guest_id).first() def get_guests(db: Session, skip: int = 0, limit: int = 100): return db.query(models.Guest).offset(skip).limit(limit).all() def create_guest(db: Session, guest: schemas.GuestCreate): db_guest = models.Guest(**guest.model_dump()) db.add(db_guest) db.commit() db.refresh(db_guest) return db_guest def update_guest(db: Session, guest_id: int, guest: schemas.GuestUpdate): db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first() if db_guest: update_data = guest.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_guest, field, value) db.commit() db.refresh(db_guest) return db_guest def delete_guest(db: Session, guest_id: int): db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first() if db_guest: db.delete(db_guest) db.commit() return True return False def search_guests( db: Session, query: str = "", rsvp_status: str = None, meal_preference: str = None, owner: str = None ): db_query = db.query(models.Guest) # Search by name, email, or phone if query: search_pattern = f"%{query}%" db_query = db_query.filter( or_( models.Guest.first_name.ilike(search_pattern), models.Guest.last_name.ilike(search_pattern), models.Guest.email.ilike(search_pattern), models.Guest.phone_number.ilike(search_pattern) ) ) # Filter by RSVP status if rsvp_status: db_query = db_query.filter(models.Guest.rsvp_status == rsvp_status) # Filter by meal preference if meal_preference: db_query = db_query.filter(models.Guest.meal_preference == meal_preference) # Filter by owner if owner: db_query = db_query.filter(models.Guest.owner == owner) return db_query.all() def delete_guests_bulk(db: Session, guest_ids: list[int]): """Delete multiple guests by their IDs""" deleted_count = db.query(models.Guest).filter(models.Guest.id.in_(guest_ids)).delete(synchronize_session=False) db.commit() return deleted_count def delete_guests_by_owner(db: Session, owner: str): """Delete all guests by owner (for undo import)""" # Delete guests where owner matches exactly or is in comma-separated list deleted_count = db.query(models.Guest).filter( or_( models.Guest.owner == owner, models.Guest.owner.like(f"{owner},%"), models.Guest.owner.like(f"%,{owner},%"), models.Guest.owner.like(f"%,{owner}") ) ).delete(synchronize_session=False) db.commit() return deleted_count def get_unique_owners(db: Session): """Get list of unique owner emails""" results = db.query(models.Guest.owner).distinct().filter(models.Guest.owner.isnot(None)).all() owners = set() for result in results: if result[0]: # Split comma-separated owners for owner in result[0].split(','): owners.add(owner.strip()) return sorted(list(owners))