from sqlalchemy.orm import Session from sqlalchemy import or_, and_, func import models import schemas from uuid import UUID from typing import Optional # ============================================ # User CRUD # ============================================ def get_or_create_user(db: Session, email: str) -> models.User: """Get existing user or create new one""" user = db.query(models.User).filter(models.User.email == email).first() if not user: user = models.User(email=email) db.add(user) db.commit() db.refresh(user) return user def get_user(db: Session, user_id: UUID) -> Optional[models.User]: return db.query(models.User).filter(models.User.id == user_id).first() def get_user_by_email(db: Session, email: str) -> Optional[models.User]: return db.query(models.User).filter(models.User.email == email).first() # ============================================ # Event CRUD # ============================================ def create_event(db: Session, event: schemas.EventCreate, creator_user_id) -> models.Event: """Create event and add creator as admin member""" from uuid import UUID db_event = models.Event(**event.model_dump()) db.add(db_event) db.flush() # Ensure event has ID # Handle both UUID and string user IDs (admin user) if isinstance(creator_user_id, str): # For admin users (non-UUID), use a fixed UUID if creator_user_id == 'admin-user': creator_uuid = UUID('00000000-0000-0000-0000-000000000001') # Ensure admin user exists in database admin_user = db.query(models.User).filter(models.User.id == creator_uuid).first() if not admin_user: admin_user = models.User(id=creator_uuid, email='admin@admin.local') db.add(admin_user) db.flush() else: # Try to parse as UUID try: creator_uuid = UUID(creator_user_id) except ValueError: creator_uuid = UUID('00000000-0000-0000-0000-000000000001') # Ensure admin user exists admin_user = db.query(models.User).filter(models.User.id == creator_uuid).first() if not admin_user: admin_user = models.User(id=creator_uuid, email='admin@admin.local') db.add(admin_user) db.flush() else: creator_uuid = creator_user_id # Add creator as admin member member = models.EventMember( event_id=db_event.id, user_id=creator_uuid, role=models.RoleEnum.admin, display_name="Admin" ) db.add(member) db.commit() db.refresh(db_event) return db_event def get_event(db: Session, event_id: UUID) -> Optional[models.Event]: return db.query(models.Event).filter(models.Event.id == event_id).first() def get_events_for_user(db: Session, user_id): """Get all events where user is a member""" from uuid import UUID # Handle both UUID and string user IDs (admin user) if isinstance(user_id, str): if user_id == 'admin-user': user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: try: user_uuid = UUID(user_id) except ValueError: user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: user_uuid = user_id return db.query(models.Event).join( models.EventMember, models.Event.id == models.EventMember.event_id ).filter(models.EventMember.user_id == user_uuid).all() def update_event(db: Session, event_id: UUID, event: schemas.EventUpdate) -> Optional[models.Event]: db_event = db.query(models.Event).filter(models.Event.id == event_id).first() if db_event: update_data = event.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_event, field, value) db.commit() db.refresh(db_event) return db_event def delete_event(db: Session, event_id: UUID) -> bool: db_event = db.query(models.Event).filter(models.Event.id == event_id).first() if db_event: db.delete(db_event) db.commit() return True return False # ============================================ # Event Member CRUD # ============================================ def create_event_member( db: Session, event_id: UUID, user_id: UUID, role: str = "admin", display_name: Optional[str] = None ) -> Optional[models.EventMember]: """Add user to event""" member = models.EventMember( event_id=event_id, user_id=user_id, role=getattr(models.RoleEnum, role) if isinstance(role, str) else role, display_name=display_name ) db.add(member) db.commit() db.refresh(member) return member def get_event_member(db: Session, event_id: UUID, user_id) -> Optional[models.EventMember]: """Check if user is member of event and get their role""" # Handle both UUID and string user IDs (admin user) if isinstance(user_id, str): if user_id == 'admin-user': user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: try: user_uuid = UUID(user_id) except ValueError: user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: user_uuid = user_id return db.query(models.EventMember).filter( and_( models.EventMember.event_id == event_id, models.EventMember.user_id == user_uuid ) ).first() def get_event_members(db: Session, event_id: UUID): """Get all members of an event""" return db.query(models.EventMember).filter( models.EventMember.event_id == event_id ).all() def update_event_member_role( db: Session, event_id: UUID, user_id, role: str ) -> Optional[models.EventMember]: """Update member's role""" member = get_event_member(db, event_id, user_id) if member: member.role = getattr(models.RoleEnum, role) if isinstance(role, str) else role db.commit() db.refresh(member) return member def remove_event_member(db: Session, event_id: UUID, user_id) -> bool: """Remove user from event""" member = get_event_member(db, event_id, user_id) if member: db.delete(member) db.commit() return True return False # ============================================ # Guest CRUD (Event-Scoped) # ============================================ def create_guest( db: Session, event_id: UUID, guest: schemas.GuestCreate, added_by_user_id ) -> models.Guest: """Create a guest for an event""" # Handle both UUID and string user IDs (admin user) if isinstance(added_by_user_id, str): if added_by_user_id == 'admin-user': user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: try: user_uuid = UUID(added_by_user_id) except ValueError: user_uuid = UUID('00000000-0000-0000-0000-000000000001') else: user_uuid = added_by_user_id db_guest = models.Guest( event_id=event_id, added_by_user_id=user_uuid, **guest.model_dump() ) db.add(db_guest) db.commit() db.refresh(db_guest) return db_guest def get_guest(db: Session, guest_id: UUID, event_id: UUID) -> Optional[models.Guest]: """Get guest (verify it belongs to event)""" return db.query(models.Guest).filter( and_( models.Guest.id == guest_id, models.Guest.event_id == event_id ) ).first() def get_guests( db: Session, event_id: UUID, skip: int = 0, limit: int = 1000 ): """Get all guests for an event""" return db.query(models.Guest).filter( models.Guest.event_id == event_id ).offset(skip).limit(limit).all() def search_guests( db: Session, event_id: UUID, query: Optional[str] = None, status: Optional[str] = None, side: Optional[str] = None, added_by_user_id: Optional[UUID] = None, owner_email: Optional[str] = None ): """Search/filter guests for an event""" db_query = db.query(models.Guest).filter(models.Guest.event_id == event_id) if query: search_pattern = f"%{query}%" db_query = db_query.filter( or_( models.Guest.first_name.ilike(search_pattern), models.Guest.last_name.ilike(search_pattern), models.Guest.phone_number.ilike(search_pattern), models.Guest.email.ilike(search_pattern) ) ) if status: db_query = db_query.filter(models.Guest.rsvp_status == status) if side: db_query = db_query.filter(models.Guest.side == side) if added_by_user_id: db_query = db_query.filter(models.Guest.added_by_user_id == added_by_user_id) if owner_email: if owner_email == "self-service": db_query = db_query.filter(models.Guest.source == "self-service") else: db_query = db_query.filter(models.Guest.owner_email == owner_email) return db_query.all() def update_guest( db: Session, guest_id: UUID, event_id: UUID, guest: schemas.GuestUpdate ) -> Optional[models.Guest]: """Update guest (verify it belongs to event)""" db_guest = get_guest(db, guest_id, event_id) if db_guest: update_data = guest.model_dump(exclude_unset=True) for field, value in update_data.items(): setattr(db_guest, field, value) db.commit() db.refresh(db_guest) return db_guest def delete_guest(db: Session, guest_id: UUID, event_id: UUID) -> bool: """Delete guest (verify it belongs to event)""" db_guest = get_guest(db, guest_id, event_id) if db_guest: db.delete(db_guest) db.commit() return True return False def bulk_import_guests( db: Session, event_id: UUID, guests: list[schemas.GuestImportItem], added_by_user_id: UUID ) -> list[models.Guest]: """Import multiple guests at once""" imported_guests = [] for guest_data in guests: db_guest = models.Guest( event_id=event_id, added_by_user_id=added_by_user_id, **guest_data.model_dump() ) db.add(db_guest) imported_guests.append(db_guest) db.commit() # Refresh all to get IDs and timestamps for guest in imported_guests: db.refresh(guest) return imported_guests def delete_guests_bulk(db: Session, event_id: UUID, guest_ids: list[UUID]) -> int: """Delete multiple guests""" deleted_count = db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.id.in_(guest_ids) ) ).delete(synchronize_session=False) db.commit() return deleted_count def get_guests_by_status(db: Session, event_id: UUID, status: str): """Get guests with specific status""" return db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.rsvp_status == status ) ).all() def get_guests_by_side(db: Session, event_id: UUID, side: str): """Get guests for a specific side""" return db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.side == side ) ).all() def get_guest_by_phone(db: Session, event_id: UUID, phone: str) -> Optional[models.Guest]: """Get guest by phone number (within event)""" return db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.phone_number == phone ) ).first() # ============================================ # Statistics and Analytics # ============================================ def get_event_stats(db: Session, event_id: UUID): """Get summary stats for an event""" total = db.query(func.count(models.Guest.id)).filter( models.Guest.event_id == event_id ).scalar() or 0 confirmed = db.query(func.count(models.Guest.id)).filter( and_( models.Guest.event_id == event_id, models.Guest.rsvp_status == "confirmed" ) ).scalar() or 0 declined = db.query(func.count(models.Guest.id)).filter( and_( models.Guest.event_id == event_id, models.Guest.rsvp_status == "declined" ) ).scalar() or 0 invited = total - confirmed - declined return { "total": total, "confirmed": confirmed, "declined": declined, "invited": invited, "confirmation_rate": (confirmed / total * 100) if total > 0 else 0 } def get_sides_summary(db: Session, event_id: UUID): """Get guest breakdown by side""" sides = db.query( models.Guest.side, func.count(models.Guest.id).label('count') ).filter( models.Guest.event_id == event_id ).group_by(models.Guest.side).all() return [{"side": side, "count": count} for side, count in sides] # ============================================ # WhatsApp Integration - CRUD # ============================================ def get_guest_for_whatsapp(db: Session, event_id: UUID, guest_id: UUID) -> Optional[models.Guest]: """Get guest details for WhatsApp sending""" return db.query(models.Guest).filter( and_( models.Guest.id == guest_id, models.Guest.event_id == event_id ) ).first() def get_guests_for_whatsapp(db: Session, event_id: UUID, guest_ids: list) -> list: """Get multiple guests for WhatsApp sending""" return db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.id.in_(guest_ids) ) ).all() def get_event_for_whatsapp(db: Session, event_id: UUID) -> Optional[models.Event]: """Get event details needed for WhatsApp template variables""" return db.query(models.Event).filter(models.Event.id == event_id).first() # ============================================ # Duplicate Detection & Merging # ============================================ def find_duplicate_guests(db: Session, event_id: UUID, by: str = "phone") -> dict: """ Find duplicate guests within an event. Returns groups with 2+ guests sharing the same phone / email / name. Response structure matches the DuplicateManager frontend component. """ guests = db.query(models.Guest).filter( models.Guest.event_id == event_id ).all() # group guests by key groups: dict = {} for guest in guests: if by == "phone": raw = (guest.phone_number or "").strip() if not raw: continue key = raw.lower() elif by == "email": raw = (guest.email or "").strip() if not raw: continue key = raw.lower() elif by == "name": raw = f"{guest.first_name or ''} {guest.last_name or ''}".strip() if not raw or raw == " ": continue key = raw.lower() else: continue entry = { "id": str(guest.id), "first_name": guest.first_name or "", "last_name": guest.last_name or "", "phone_number": guest.phone_number or "", "email": guest.email or "", "rsvp_status": guest.rsvp_status or "invited", "meal_preference": guest.meal_preference or "", "has_plus_one": bool(guest.has_plus_one), "plus_one_name": guest.plus_one_name or "", "table_number": guest.table_number or "", "owner": guest.owner_email or "", } if key not in groups: groups[key] = [] groups[key].append(entry) # Build result list — only groups with 2+ guests duplicate_groups = [] for key, members in groups.items(): if len(members) < 2: continue # Pick display values from the first member first = members[0] group_entry = { "key": key, "count": len(members), "guests": members, } if by == "phone": group_entry["phone_number"] = first["phone_number"] or key elif by == "email": group_entry["email"] = first["email"] or key else: # name group_entry["first_name"] = first["first_name"] group_entry["last_name"] = first["last_name"] duplicate_groups.append(group_entry) return { "duplicates": duplicate_groups, "count": len(duplicate_groups), "by": by, } def merge_guests(db: Session, event_id: UUID, keep_id: UUID, merge_ids: list) -> dict: """ Merge multiple guests into one Args: db: Database session event_id: Event ID keep_id: Guest ID to keep merge_ids: List of guest IDs to merge into keep_id Returns: dict with merge results """ # Verify keep_id exists and is in the event keep_guest = db.query(models.Guest).filter( and_( models.Guest.id == keep_id, models.Guest.event_id == event_id ) ).first() if not keep_guest: raise ValueError("Keep guest not found in event") # Get guests to merge merge_guests = db.query(models.Guest).filter( and_( models.Guest.event_id == event_id, models.Guest.id.in_(merge_ids) ) ).all() if not merge_guests: raise ValueError("No guests to merge found") # Count merged guests merged_count = 0 # Delete duplicates for guest in merge_guests: db.delete(guest) merged_count += 1 db.commit() db.refresh(keep_guest) return { "status": "success", "kept_guest_id": str(keep_guest.id), "kept_guest_name": f"{keep_guest.first_name} {keep_guest.last_name}", "merged_count": merged_count }