602 lines
18 KiB
Python
602 lines
18 KiB
Python
from sqlalchemy.orm import Session
|
|
from sqlalchemy import or_, and_, func
|
|
import models
|
|
import schemas
|
|
from uuid import UUID
|
|
from typing import Optional
|
|
|
|
|
|
# ============================================
|
|
# User CRUD
|
|
# ============================================
|
|
def get_or_create_user(db: Session, email: str) -> models.User:
|
|
"""Get existing user or create new one"""
|
|
user = db.query(models.User).filter(models.User.email == email).first()
|
|
if not user:
|
|
user = models.User(email=email)
|
|
db.add(user)
|
|
db.commit()
|
|
db.refresh(user)
|
|
return user
|
|
|
|
|
|
def get_user(db: Session, user_id: UUID) -> Optional[models.User]:
|
|
return db.query(models.User).filter(models.User.id == user_id).first()
|
|
|
|
|
|
def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
|
|
return db.query(models.User).filter(models.User.email == email).first()
|
|
|
|
|
|
# ============================================
|
|
# Event CRUD
|
|
# ============================================
|
|
def create_event(db: Session, event: schemas.EventCreate, creator_user_id) -> models.Event:
|
|
"""Create event and add creator as admin member"""
|
|
from uuid import UUID
|
|
|
|
db_event = models.Event(**event.model_dump())
|
|
db.add(db_event)
|
|
db.flush() # Ensure event has ID
|
|
|
|
# Handle both UUID and string user IDs (admin user)
|
|
if isinstance(creator_user_id, str):
|
|
# For admin users (non-UUID), use a fixed UUID
|
|
if creator_user_id == 'admin-user':
|
|
creator_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
# Ensure admin user exists in database
|
|
admin_user = db.query(models.User).filter(models.User.id == creator_uuid).first()
|
|
if not admin_user:
|
|
admin_user = models.User(id=creator_uuid, email='admin@admin.local')
|
|
db.add(admin_user)
|
|
db.flush()
|
|
else:
|
|
# Try to parse as UUID
|
|
try:
|
|
creator_uuid = UUID(creator_user_id)
|
|
except ValueError:
|
|
creator_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
# Ensure admin user exists
|
|
admin_user = db.query(models.User).filter(models.User.id == creator_uuid).first()
|
|
if not admin_user:
|
|
admin_user = models.User(id=creator_uuid, email='admin@admin.local')
|
|
db.add(admin_user)
|
|
db.flush()
|
|
else:
|
|
creator_uuid = creator_user_id
|
|
|
|
# Add creator as admin member
|
|
member = models.EventMember(
|
|
event_id=db_event.id,
|
|
user_id=creator_uuid,
|
|
role=models.RoleEnum.admin,
|
|
display_name="Admin"
|
|
)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
return db_event
|
|
|
|
|
|
def get_event(db: Session, event_id: UUID) -> Optional[models.Event]:
|
|
return db.query(models.Event).filter(models.Event.id == event_id).first()
|
|
|
|
|
|
def get_events_for_user(db: Session, user_id):
|
|
"""Get all events where user is a member"""
|
|
from uuid import UUID
|
|
|
|
# Handle both UUID and string user IDs (admin user)
|
|
if isinstance(user_id, str):
|
|
if user_id == 'admin-user':
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
try:
|
|
user_uuid = UUID(user_id)
|
|
except ValueError:
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
user_uuid = user_id
|
|
|
|
return db.query(models.Event).join(
|
|
models.EventMember,
|
|
models.Event.id == models.EventMember.event_id
|
|
).filter(models.EventMember.user_id == user_uuid).all()
|
|
|
|
|
|
def update_event(db: Session, event_id: UUID, event: schemas.EventUpdate) -> Optional[models.Event]:
|
|
db_event = db.query(models.Event).filter(models.Event.id == event_id).first()
|
|
if db_event:
|
|
update_data = event.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_event, field, value)
|
|
db.commit()
|
|
db.refresh(db_event)
|
|
return db_event
|
|
|
|
|
|
def delete_event(db: Session, event_id: UUID) -> bool:
|
|
db_event = db.query(models.Event).filter(models.Event.id == event_id).first()
|
|
if db_event:
|
|
db.delete(db_event)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
# ============================================
|
|
# Event Member CRUD
|
|
# ============================================
|
|
def create_event_member(
|
|
db: Session,
|
|
event_id: UUID,
|
|
user_id: UUID,
|
|
role: str = "admin",
|
|
display_name: Optional[str] = None
|
|
) -> Optional[models.EventMember]:
|
|
"""Add user to event"""
|
|
member = models.EventMember(
|
|
event_id=event_id,
|
|
user_id=user_id,
|
|
role=getattr(models.RoleEnum, role) if isinstance(role, str) else role,
|
|
display_name=display_name
|
|
)
|
|
db.add(member)
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def get_event_member(db: Session, event_id: UUID, user_id) -> Optional[models.EventMember]:
|
|
"""Check if user is member of event and get their role"""
|
|
# Handle both UUID and string user IDs (admin user)
|
|
if isinstance(user_id, str):
|
|
if user_id == 'admin-user':
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
try:
|
|
user_uuid = UUID(user_id)
|
|
except ValueError:
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
user_uuid = user_id
|
|
|
|
return db.query(models.EventMember).filter(
|
|
and_(
|
|
models.EventMember.event_id == event_id,
|
|
models.EventMember.user_id == user_uuid
|
|
)
|
|
).first()
|
|
|
|
|
|
def get_event_members(db: Session, event_id: UUID):
|
|
"""Get all members of an event"""
|
|
return db.query(models.EventMember).filter(
|
|
models.EventMember.event_id == event_id
|
|
).all()
|
|
|
|
|
|
def update_event_member_role(
|
|
db: Session,
|
|
event_id: UUID,
|
|
user_id,
|
|
role: str
|
|
) -> Optional[models.EventMember]:
|
|
"""Update member's role"""
|
|
member = get_event_member(db, event_id, user_id)
|
|
if member:
|
|
member.role = getattr(models.RoleEnum, role) if isinstance(role, str) else role
|
|
db.commit()
|
|
db.refresh(member)
|
|
return member
|
|
|
|
|
|
def remove_event_member(db: Session, event_id: UUID, user_id) -> bool:
|
|
"""Remove user from event"""
|
|
member = get_event_member(db, event_id, user_id)
|
|
if member:
|
|
db.delete(member)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
# ============================================
|
|
# Guest CRUD (Event-Scoped)
|
|
# ============================================
|
|
def create_guest(
|
|
db: Session,
|
|
event_id: UUID,
|
|
guest: schemas.GuestCreate,
|
|
added_by_user_id
|
|
) -> models.Guest:
|
|
"""Create a guest for an event"""
|
|
# Handle both UUID and string user IDs (admin user)
|
|
if isinstance(added_by_user_id, str):
|
|
if added_by_user_id == 'admin-user':
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
try:
|
|
user_uuid = UUID(added_by_user_id)
|
|
except ValueError:
|
|
user_uuid = UUID('00000000-0000-0000-0000-000000000001')
|
|
else:
|
|
user_uuid = added_by_user_id
|
|
|
|
db_guest = models.Guest(
|
|
event_id=event_id,
|
|
added_by_user_id=user_uuid,
|
|
**guest.model_dump()
|
|
)
|
|
db.add(db_guest)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def get_guest(db: Session, guest_id: UUID, event_id: UUID) -> Optional[models.Guest]:
|
|
"""Get guest (verify it belongs to event)"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.id == guest_id,
|
|
models.Guest.event_id == event_id
|
|
)
|
|
).first()
|
|
|
|
|
|
def get_guests(
|
|
db: Session,
|
|
event_id: UUID,
|
|
skip: int = 0,
|
|
limit: int = 1000
|
|
):
|
|
"""Get all guests for an event"""
|
|
return db.query(models.Guest).filter(
|
|
models.Guest.event_id == event_id
|
|
).offset(skip).limit(limit).all()
|
|
|
|
|
|
def search_guests(
|
|
db: Session,
|
|
event_id: UUID,
|
|
query: Optional[str] = None,
|
|
status: Optional[str] = None,
|
|
side: Optional[str] = None,
|
|
added_by_user_id: Optional[UUID] = None,
|
|
owner_email: Optional[str] = None
|
|
):
|
|
"""Search/filter guests for an event"""
|
|
db_query = db.query(models.Guest).filter(models.Guest.event_id == event_id)
|
|
|
|
if query:
|
|
search_pattern = f"%{query}%"
|
|
db_query = db_query.filter(
|
|
or_(
|
|
models.Guest.first_name.ilike(search_pattern),
|
|
models.Guest.last_name.ilike(search_pattern),
|
|
models.Guest.phone_number.ilike(search_pattern),
|
|
models.Guest.email.ilike(search_pattern)
|
|
)
|
|
)
|
|
|
|
if status:
|
|
db_query = db_query.filter(models.Guest.rsvp_status == status)
|
|
|
|
if side:
|
|
db_query = db_query.filter(models.Guest.side == side)
|
|
|
|
if added_by_user_id:
|
|
db_query = db_query.filter(models.Guest.added_by_user_id == added_by_user_id)
|
|
|
|
if owner_email:
|
|
if owner_email == "self-service":
|
|
db_query = db_query.filter(models.Guest.source == "self-service")
|
|
else:
|
|
db_query = db_query.filter(models.Guest.owner_email == owner_email)
|
|
|
|
return db_query.all()
|
|
|
|
|
|
def update_guest(
|
|
db: Session,
|
|
guest_id: UUID,
|
|
event_id: UUID,
|
|
guest: schemas.GuestUpdate
|
|
) -> Optional[models.Guest]:
|
|
"""Update guest (verify it belongs to event)"""
|
|
db_guest = get_guest(db, guest_id, event_id)
|
|
if db_guest:
|
|
update_data = guest.model_dump(exclude_unset=True)
|
|
for field, value in update_data.items():
|
|
setattr(db_guest, field, value)
|
|
db.commit()
|
|
db.refresh(db_guest)
|
|
return db_guest
|
|
|
|
|
|
def delete_guest(db: Session, guest_id: UUID, event_id: UUID) -> bool:
|
|
"""Delete guest (verify it belongs to event)"""
|
|
db_guest = get_guest(db, guest_id, event_id)
|
|
if db_guest:
|
|
db.delete(db_guest)
|
|
db.commit()
|
|
return True
|
|
return False
|
|
|
|
|
|
def bulk_import_guests(
|
|
db: Session,
|
|
event_id: UUID,
|
|
guests: list[schemas.GuestImportItem],
|
|
added_by_user_id: UUID
|
|
) -> list[models.Guest]:
|
|
"""Import multiple guests at once"""
|
|
imported_guests = []
|
|
for guest_data in guests:
|
|
db_guest = models.Guest(
|
|
event_id=event_id,
|
|
added_by_user_id=added_by_user_id,
|
|
**guest_data.model_dump()
|
|
)
|
|
db.add(db_guest)
|
|
imported_guests.append(db_guest)
|
|
|
|
db.commit()
|
|
# Refresh all to get IDs and timestamps
|
|
for guest in imported_guests:
|
|
db.refresh(guest)
|
|
|
|
return imported_guests
|
|
|
|
|
|
def delete_guests_bulk(db: Session, event_id: UUID, guest_ids: list[UUID]) -> int:
|
|
"""Delete multiple guests"""
|
|
deleted_count = db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.id.in_(guest_ids)
|
|
)
|
|
).delete(synchronize_session=False)
|
|
db.commit()
|
|
return deleted_count
|
|
|
|
|
|
def get_guests_by_status(db: Session, event_id: UUID, status: str):
|
|
"""Get guests with specific status"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.rsvp_status == status
|
|
)
|
|
).all()
|
|
|
|
|
|
def get_guests_by_side(db: Session, event_id: UUID, side: str):
|
|
"""Get guests for a specific side"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.side == side
|
|
)
|
|
).all()
|
|
|
|
|
|
def get_guest_by_phone(db: Session, event_id: UUID, phone: str) -> Optional[models.Guest]:
|
|
"""Get guest by phone number (within event)"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.phone_number == phone
|
|
)
|
|
).first()
|
|
|
|
|
|
# ============================================
|
|
# Statistics and Analytics
|
|
# ============================================
|
|
def get_event_stats(db: Session, event_id: UUID):
|
|
"""Get summary stats for an event"""
|
|
total = db.query(func.count(models.Guest.id)).filter(
|
|
models.Guest.event_id == event_id
|
|
).scalar() or 0
|
|
|
|
confirmed = db.query(func.count(models.Guest.id)).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.rsvp_status == "confirmed"
|
|
)
|
|
).scalar() or 0
|
|
|
|
declined = db.query(func.count(models.Guest.id)).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.rsvp_status == "declined"
|
|
)
|
|
).scalar() or 0
|
|
|
|
invited = total - confirmed - declined
|
|
|
|
return {
|
|
"total": total,
|
|
"confirmed": confirmed,
|
|
"declined": declined,
|
|
"invited": invited,
|
|
"confirmation_rate": (confirmed / total * 100) if total > 0 else 0
|
|
}
|
|
|
|
|
|
def get_sides_summary(db: Session, event_id: UUID):
|
|
"""Get guest breakdown by side"""
|
|
sides = db.query(
|
|
models.Guest.side,
|
|
func.count(models.Guest.id).label('count')
|
|
).filter(
|
|
models.Guest.event_id == event_id
|
|
).group_by(models.Guest.side).all()
|
|
|
|
return [{"side": side, "count": count} for side, count in sides]
|
|
|
|
|
|
# ============================================
|
|
# WhatsApp Integration - CRUD
|
|
# ============================================
|
|
def get_guest_for_whatsapp(db: Session, event_id: UUID, guest_id: UUID) -> Optional[models.Guest]:
|
|
"""Get guest details for WhatsApp sending"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.id == guest_id,
|
|
models.Guest.event_id == event_id
|
|
)
|
|
).first()
|
|
|
|
|
|
def get_guests_for_whatsapp(db: Session, event_id: UUID, guest_ids: list) -> list:
|
|
"""Get multiple guests for WhatsApp sending"""
|
|
return db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.id.in_(guest_ids)
|
|
)
|
|
).all()
|
|
|
|
|
|
def get_event_for_whatsapp(db: Session, event_id: UUID) -> Optional[models.Event]:
|
|
"""Get event details needed for WhatsApp template variables"""
|
|
return db.query(models.Event).filter(models.Event.id == event_id).first()
|
|
|
|
|
|
# ============================================
|
|
# Duplicate Detection & Merging
|
|
# ============================================
|
|
def find_duplicate_guests(db: Session, event_id: UUID, by: str = "phone") -> dict:
|
|
"""
|
|
Find duplicate guests within an event.
|
|
Returns groups with 2+ guests sharing the same phone / email / name.
|
|
Response structure matches the DuplicateManager frontend component.
|
|
"""
|
|
guests = db.query(models.Guest).filter(
|
|
models.Guest.event_id == event_id
|
|
).all()
|
|
|
|
# group guests by key
|
|
groups: dict = {}
|
|
|
|
for guest in guests:
|
|
if by == "phone":
|
|
raw = (guest.phone_number or "").strip()
|
|
if not raw:
|
|
continue
|
|
key = raw.lower()
|
|
elif by == "email":
|
|
raw = (guest.email or "").strip()
|
|
if not raw:
|
|
continue
|
|
key = raw.lower()
|
|
elif by == "name":
|
|
raw = f"{guest.first_name or ''} {guest.last_name or ''}".strip()
|
|
if not raw or raw == " ":
|
|
continue
|
|
key = raw.lower()
|
|
else:
|
|
continue
|
|
|
|
entry = {
|
|
"id": str(guest.id),
|
|
"first_name": guest.first_name or "",
|
|
"last_name": guest.last_name or "",
|
|
"phone_number": guest.phone_number or "",
|
|
"email": guest.email or "",
|
|
"rsvp_status": guest.rsvp_status or "invited",
|
|
"meal_preference": guest.meal_preference or "",
|
|
"has_plus_one": bool(guest.has_plus_one),
|
|
"plus_one_name": guest.plus_one_name or "",
|
|
"table_number": guest.table_number or "",
|
|
"owner": guest.owner_email or "",
|
|
}
|
|
|
|
if key not in groups:
|
|
groups[key] = []
|
|
groups[key].append(entry)
|
|
|
|
# Build result list — only groups with 2+ guests
|
|
duplicate_groups = []
|
|
for key, members in groups.items():
|
|
if len(members) < 2:
|
|
continue
|
|
# Pick display values from the first member
|
|
first = members[0]
|
|
group_entry = {
|
|
"key": key,
|
|
"count": len(members),
|
|
"guests": members,
|
|
}
|
|
if by == "phone":
|
|
group_entry["phone_number"] = first["phone_number"] or key
|
|
elif by == "email":
|
|
group_entry["email"] = first["email"] or key
|
|
else: # name
|
|
group_entry["first_name"] = first["first_name"]
|
|
group_entry["last_name"] = first["last_name"]
|
|
duplicate_groups.append(group_entry)
|
|
|
|
return {
|
|
"duplicates": duplicate_groups,
|
|
"count": len(duplicate_groups),
|
|
"by": by,
|
|
}
|
|
|
|
|
|
def merge_guests(db: Session, event_id: UUID, keep_id: UUID, merge_ids: list) -> dict:
|
|
"""
|
|
Merge multiple guests into one
|
|
|
|
Args:
|
|
db: Database session
|
|
event_id: Event ID
|
|
keep_id: Guest ID to keep
|
|
merge_ids: List of guest IDs to merge into keep_id
|
|
|
|
Returns:
|
|
dict with merge results
|
|
"""
|
|
# Verify keep_id exists and is in the event
|
|
keep_guest = db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.id == keep_id,
|
|
models.Guest.event_id == event_id
|
|
)
|
|
).first()
|
|
|
|
if not keep_guest:
|
|
raise ValueError("Keep guest not found in event")
|
|
|
|
# Get guests to merge
|
|
merge_guests = db.query(models.Guest).filter(
|
|
and_(
|
|
models.Guest.event_id == event_id,
|
|
models.Guest.id.in_(merge_ids)
|
|
)
|
|
).all()
|
|
|
|
if not merge_guests:
|
|
raise ValueError("No guests to merge found")
|
|
|
|
# Count merged guests
|
|
merged_count = 0
|
|
|
|
# Delete duplicates
|
|
for guest in merge_guests:
|
|
db.delete(guest)
|
|
merged_count += 1
|
|
|
|
db.commit()
|
|
db.refresh(keep_guest)
|
|
|
|
return {
|
|
"status": "success",
|
|
"kept_guest_id": str(keep_guest.id),
|
|
"kept_guest_name": f"{keep_guest.first_name} {keep_guest.last_name}",
|
|
"merged_count": merged_count
|
|
}
|
|
|