178 lines
5.6 KiB
Python
178 lines
5.6 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_
|
|
import models
|
|
import schemas
|
|
|
|
|
|
def get_guest(db: Session, guest_id: int):
|
|
return db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
|
|
|
|
def get_guests(db: Session, skip: int = 0, limit: int = 100):
|
|
return db.query(models.Guest).offset(skip).limit(limit).all()
|
|
|
|
|
|
def create_guest(db: Session, guest: schemas.GuestCreate):
|
|
db_guest = models.Guest(**guest.model_dump())
|
|
db.add(db_guest)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def update_guest(db: Session, guest_id: int, guest: schemas.GuestUpdate):
|
|
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
if db_guest:
|
|
update_data = guest.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_guest, field, value)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def delete_guest(db: Session, guest_id: int):
|
|
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
|
|
if db_guest:
|
|
db.delete(db_guest)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def search_guests(
|
|
db: Session,
|
|
query: str = "",
|
|
rsvp_status: str = None,
|
|
meal_preference: str = None,
|
|
owner: str = None
|
|
):
|
|
db_query = db.query(models.Guest)
|
|
|
|
# Search by name, email, or phone
|
|
if query:
|
|
search_pattern = f"%{query}%"
|
|
db_query = db_query.filter(
|
|
or_(
|
|
models.Guest.first_name.ilike(search_pattern),
|
|
models.Guest.last_name.ilike(search_pattern),
|
|
models.Guest.email.ilike(search_pattern),
|
|
models.Guest.phone_number.ilike(search_pattern)
|
|
)
|
|
)
|
|
|
|
# Filter by RSVP status
|
|
if rsvp_status:
|
|
db_query = db_query.filter(models.Guest.rsvp_status == rsvp_status)
|
|
|
|
# Filter by meal preference
|
|
if meal_preference:
|
|
db_query = db_query.filter(models.Guest.meal_preference == meal_preference)
|
|
|
|
# Filter by owner
|
|
if owner:
|
|
db_query = db_query.filter(models.Guest.owner == owner)
|
|
|
|
return db_query.all()
|
|
|
|
|
|
def delete_guests_bulk(db: Session, guest_ids: list[int]):
|
|
"""Delete multiple guests by their IDs"""
|
|
deleted_count = db.query(models.Guest).filter(models.Guest.id.in_(guest_ids)).delete(synchronize_session=False)
|
|
db.commit()
|
|
return deleted_count
|
|
|
|
|
|
def delete_guests_by_owner(db: Session, owner: str):
|
|
"""Delete all guests by owner (for undo import)"""
|
|
# Delete guests where owner matches exactly or is in comma-separated list
|
|
deleted_count = db.query(models.Guest).filter(
|
|
or_(
|
|
models.Guest.owner == owner,
|
|
models.Guest.owner.like(f"{owner},%"),
|
|
models.Guest.owner.like(f"%,{owner},%"),
|
|
models.Guest.owner.like(f"%,{owner}")
|
|
)
|
|
).delete(synchronize_session=False)
|
|
db.commit()
|
|
return deleted_count
|
|
|
|
|
|
def get_unique_owners(db: Session):
|
|
"""Get list of unique owner emails"""
|
|
results = db.query(models.Guest.owner).distinct().filter(models.Guest.owner.isnot(None)).all()
|
|
owners = set()
|
|
for result in results:
|
|
if result[0]:
|
|
# Split comma-separated owners
|
|
for owner in result[0].split(','):
|
|
owners.add(owner.strip())
|
|
return sorted(list(owners))
|
|
|
|
|
|
def find_duplicate_guests(db: Session):
|
|
"""Find guests with duplicate phone numbers"""
|
|
from sqlalchemy import func
|
|
|
|
# Find phone numbers that appear more than once
|
|
duplicates = db.query(
|
|
models.Guest.phone_number,
|
|
func.count(models.Guest.id).label('count')
|
|
).filter(
|
|
models.Guest.phone_number.isnot(None),
|
|
models.Guest.phone_number != ''
|
|
).group_by(
|
|
models.Guest.phone_number
|
|
).having(
|
|
func.count(models.Guest.id) > 1
|
|
).all()
|
|
|
|
# Get full guest details for each duplicate phone number
|
|
result = []
|
|
for phone_number, count in duplicates:
|
|
guests = db.query(models.Guest).filter(
|
|
models.Guest.phone_number == phone_number
|
|
).all()
|
|
result.append({
|
|
'phone_number': phone_number,
|
|
'count': count,
|
|
'guests': guests
|
|
})
|
|
|
|
return result
|
|
|
|
|
|
def merge_guests(db: Session, keep_id: int, merge_ids: list[int]):
|
|
"""Merge multiple guests into one, keeping the specified guest"""
|
|
keep_guest = db.query(models.Guest).filter(models.Guest.id == keep_id).first()
|
|
if not keep_guest:
|
|
return None
|
|
|
|
merge_guests = db.query(models.Guest).filter(models.Guest.id.in_(merge_ids)).all()
|
|
|
|
# Merge data: combine information from all guests
|
|
for guest in merge_guests:
|
|
# Keep non-empty values from merged guests
|
|
if not keep_guest.email and guest.email:
|
|
keep_guest.email = guest.email
|
|
if not keep_guest.phone_number and guest.phone_number:
|
|
keep_guest.phone_number = guest.phone_number
|
|
if not keep_guest.meal_preference and guest.meal_preference:
|
|
keep_guest.meal_preference = guest.meal_preference
|
|
if not keep_guest.table_number and guest.table_number:
|
|
keep_guest.table_number = guest.table_number
|
|
|
|
# Combine owners
|
|
if guest.owner and guest.owner not in (keep_guest.owner or ''):
|
|
if keep_guest.owner:
|
|
keep_guest.owner = f"{keep_guest.owner}, {guest.owner}"
|
|
else:
|
|
keep_guest.owner = guest.owner
|
|
|
|
# Delete merged guests
|
|
db.query(models.Guest).filter(models.Guest.id.in_(merge_ids)).delete(synchronize_session=False)
|
|
db.commit()
|
|
db.refresh(keep_guest)
|
|
|
|
return keep_guest
|