invy/backend/crud.py

216 lines
7.1 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import or_
import models
import schemas
def get_guest(db: Session, guest_id: int):
return db.query(models.Guest).filter(models.Guest.id == guest_id).first()
def get_guests(db: Session, skip: int = 0, limit: int = 100):
return db.query(models.Guest).offset(skip).limit(limit).all()
def create_guest(db: Session, guest: schemas.GuestCreate):
db_guest = models.Guest(**guest.model_dump())
db.add(db_guest)
db.commit()
db.refresh(db_guest)
return db_guest
def update_guest(db: Session, guest_id: int, guest: schemas.GuestUpdate):
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
if db_guest:
update_data = guest.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_guest, field, value)
db.commit()
db.refresh(db_guest)
return db_guest
def delete_guest(db: Session, guest_id: int):
db_guest = db.query(models.Guest).filter(models.Guest.id == guest_id).first()
if db_guest:
db.delete(db_guest)
db.commit()
return True
return False
def search_guests(
db: Session,
query: str = "",
rsvp_status: str = None,
meal_preference: str = None,
owner: str = None
):
db_query = db.query(models.Guest)
# Search by name, email, or phone
if query:
search_pattern = f"%{query}%"
db_query = db_query.filter(
or_(
models.Guest.first_name.ilike(search_pattern),
models.Guest.last_name.ilike(search_pattern),
models.Guest.email.ilike(search_pattern),
models.Guest.phone_number.ilike(search_pattern)
)
)
# Filter by RSVP status
if rsvp_status:
db_query = db_query.filter(models.Guest.rsvp_status == rsvp_status)
# Filter by meal preference
if meal_preference:
db_query = db_query.filter(models.Guest.meal_preference == meal_preference)
# Filter by owner
if owner:
db_query = db_query.filter(models.Guest.owner == owner)
return db_query.all()
def delete_guests_bulk(db: Session, guest_ids: list[int]):
"""Delete multiple guests by their IDs"""
deleted_count = db.query(models.Guest).filter(models.Guest.id.in_(guest_ids)).delete(synchronize_session=False)
db.commit()
return deleted_count
def delete_guests_by_owner(db: Session, owner: str):
"""Delete all guests by owner (for undo import)"""
# Delete guests where owner matches exactly or is in comma-separated list
deleted_count = db.query(models.Guest).filter(
or_(
models.Guest.owner == owner,
models.Guest.owner.like(f"{owner},%"),
models.Guest.owner.like(f"%,{owner},%"),
models.Guest.owner.like(f"%,{owner}")
)
).delete(synchronize_session=False)
db.commit()
return deleted_count
def get_unique_owners(db: Session):
"""Get list of unique owner emails"""
results = db.query(models.Guest.owner).distinct().filter(models.Guest.owner.isnot(None)).all()
owners = set()
for result in results:
if result[0]:
# Split comma-separated owners
for owner in result[0].split(','):
owners.add(owner.strip())
return sorted(list(owners))
def find_duplicate_guests(db: Session, by: str = "phone"):
"""Find guests with duplicate phone numbers or names"""
from sqlalchemy import func, and_
if by == "name":
# Find duplicate full names (first + last name combination)
duplicates = db.query(
models.Guest.first_name,
models.Guest.last_name,
func.count(models.Guest.id).label('count')
).filter(
models.Guest.first_name.isnot(None),
models.Guest.first_name != '',
models.Guest.last_name.isnot(None),
models.Guest.last_name != ''
).group_by(
models.Guest.first_name,
models.Guest.last_name
).having(
func.count(models.Guest.id) > 1
).all()
# Get full guest details for each duplicate name
result = []
for first_name, last_name, count in duplicates:
guests = db.query(models.Guest).filter(
and_(
models.Guest.first_name == first_name,
models.Guest.last_name == last_name
)
).all()
result.append({
'key': f"{first_name} {last_name}",
'first_name': first_name,
'last_name': last_name,
'count': count,
'guests': guests,
'type': 'name'
})
else: # by == "phone"
# Find phone numbers that appear more than once
duplicates = db.query(
models.Guest.phone_number,
func.count(models.Guest.id).label('count')
).filter(
models.Guest.phone_number.isnot(None),
models.Guest.phone_number != ''
).group_by(
models.Guest.phone_number
).having(
func.count(models.Guest.id) > 1
).all()
# Get full guest details for each duplicate phone number
result = []
for phone_number, count in duplicates:
guests = db.query(models.Guest).filter(
models.Guest.phone_number == phone_number
).all()
result.append({
'key': phone_number,
'phone_number': phone_number,
'count': count,
'guests': guests,
'type': 'phone'
})
return result
def merge_guests(db: Session, keep_id: int, merge_ids: list[int]):
"""Merge multiple guests into one, keeping the specified guest"""
keep_guest = db.query(models.Guest).filter(models.Guest.id == keep_id).first()
if not keep_guest:
return None
merge_guests = db.query(models.Guest).filter(models.Guest.id.in_(merge_ids)).all()
# Merge data: combine information from all guests
for guest in merge_guests:
# Keep non-empty values from merged guests
if not keep_guest.email and guest.email:
keep_guest.email = guest.email
if not keep_guest.phone_number and guest.phone_number:
keep_guest.phone_number = guest.phone_number
if not keep_guest.meal_preference and guest.meal_preference:
keep_guest.meal_preference = guest.meal_preference
if not keep_guest.table_number and guest.table_number:
keep_guest.table_number = guest.table_number
# Combine owners
if guest.owner and guest.owner not in (keep_guest.owner or ''):
if keep_guest.owner:
keep_guest.owner = f"{keep_guest.owner}, {guest.owner}"
else:
keep_guest.owner = guest.owner
# Delete merged guests
db.query(models.Guest).filter(models.Guest.id.in_(merge_ids)).delete(synchronize_session=False)
db.commit()
db.refresh(keep_guest)
return keep_guest