invy/backend/crud.py

372 lines
11 KiB
Python

from sqlalchemy.orm import Session
from sqlalchemy import or_, and_, func
import models
import schemas
from uuid import UUID
from typing import Optional
# ============================================
# User CRUD
# ============================================
def get_or_create_user(db: Session, email: str) -> models.User:
"""Get existing user or create new one"""
user = db.query(models.User).filter(models.User.email == email).first()
if not user:
user = models.User(email=email)
db.add(user)
db.commit()
db.refresh(user)
return user
def get_user(db: Session, user_id: UUID) -> Optional[models.User]:
return db.query(models.User).filter(models.User.id == user_id).first()
def get_user_by_email(db: Session, email: str) -> Optional[models.User]:
return db.query(models.User).filter(models.User.email == email).first()
# ============================================
# Event CRUD
# ============================================
def create_event(db: Session, event: schemas.EventCreate, creator_user_id: UUID) -> models.Event:
"""Create event and add creator as admin member"""
db_event = models.Event(**event.model_dump())
db.add(db_event)
db.flush() # Ensure event has ID
# Add creator as admin member
member = models.EventMember(
event_id=db_event.id,
user_id=creator_user_id,
role=models.RoleEnum.admin,
display_name="Admin"
)
db.add(member)
db.commit()
db.refresh(db_event)
return db_event
def get_event(db: Session, event_id: UUID) -> Optional[models.Event]:
return db.query(models.Event).filter(models.Event.id == event_id).first()
def get_events_for_user(db: Session, user_id: UUID):
"""Get all events where user is a member"""
return db.query(models.Event).join(
models.EventMember,
models.Event.id == models.EventMember.event_id
).filter(models.EventMember.user_id == user_id).all()
def update_event(db: Session, event_id: UUID, event: schemas.EventUpdate) -> Optional[models.Event]:
db_event = db.query(models.Event).filter(models.Event.id == event_id).first()
if db_event:
update_data = event.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_event, field, value)
db.commit()
db.refresh(db_event)
return db_event
def delete_event(db: Session, event_id: UUID) -> bool:
db_event = db.query(models.Event).filter(models.Event.id == event_id).first()
if db_event:
db.delete(db_event)
db.commit()
return True
return False
# ============================================
# Event Member CRUD
# ============================================
def create_event_member(
db: Session,
event_id: UUID,
user_id: UUID,
role: str = "admin",
display_name: Optional[str] = None
) -> Optional[models.EventMember]:
"""Add user to event"""
member = models.EventMember(
event_id=event_id,
user_id=user_id,
role=getattr(models.RoleEnum, role) if isinstance(role, str) else role,
display_name=display_name
)
db.add(member)
db.commit()
db.refresh(member)
return member
def get_event_member(db: Session, event_id: UUID, user_id: UUID) -> Optional[models.EventMember]:
"""Check if user is member of event and get their role"""
return db.query(models.EventMember).filter(
and_(
models.EventMember.event_id == event_id,
models.EventMember.user_id == user_id
)
).first()
def get_event_members(db: Session, event_id: UUID):
"""Get all members of an event"""
return db.query(models.EventMember).filter(
models.EventMember.event_id == event_id
).all()
def update_event_member_role(
db: Session,
event_id: UUID,
user_id: UUID,
role: str
) -> Optional[models.EventMember]:
"""Update member's role"""
member = get_event_member(db, event_id, user_id)
if member:
member.role = getattr(models.RoleEnum, role) if isinstance(role, str) else role
db.commit()
db.refresh(member)
return member
def remove_event_member(db: Session, event_id: UUID, user_id: UUID) -> bool:
"""Remove user from event"""
member = get_event_member(db, event_id, user_id)
if member:
db.delete(member)
db.commit()
return True
return False
# ============================================
# Guest CRUD (Event-Scoped)
# ============================================
def create_guest(
db: Session,
event_id: UUID,
guest: schemas.GuestCreate,
added_by_user_id: UUID
) -> models.Guest:
"""Create a guest for an event"""
db_guest = models.Guest(
event_id=event_id,
added_by_user_id=added_by_user_id,
**guest.model_dump()
)
db.add(db_guest)
db.commit()
db.refresh(db_guest)
return db_guest
def get_guest(db: Session, guest_id: UUID, event_id: UUID) -> Optional[models.Guest]:
"""Get guest (verify it belongs to event)"""
return db.query(models.Guest).filter(
and_(
models.Guest.id == guest_id,
models.Guest.event_id == event_id
)
).first()
def get_guests(
db: Session,
event_id: UUID,
skip: int = 0,
limit: int = 1000
):
"""Get all guests for an event"""
return db.query(models.Guest).filter(
models.Guest.event_id == event_id
).offset(skip).limit(limit).all()
def search_guests(
db: Session,
event_id: UUID,
query: Optional[str] = None,
status: Optional[str] = None,
side: Optional[str] = None,
added_by_user_id: Optional[UUID] = None,
owner_email: Optional[str] = None
):
"""Search/filter guests for an event"""
db_query = db.query(models.Guest).filter(models.Guest.event_id == event_id)
if query:
search_pattern = f"%{query}%"
db_query = db_query.filter(
or_(
models.Guest.first_name.ilike(search_pattern),
models.Guest.last_name.ilike(search_pattern),
models.Guest.phone_number.ilike(search_pattern),
models.Guest.email.ilike(search_pattern)
)
)
if status:
db_query = db_query.filter(models.Guest.rsvp_status == status)
if side:
db_query = db_query.filter(models.Guest.side == side)
if added_by_user_id:
db_query = db_query.filter(models.Guest.added_by_user_id == added_by_user_id)
if owner_email:
if owner_email == "self-service":
db_query = db_query.filter(models.Guest.source == "self-service")
else:
db_query = db_query.filter(models.Guest.owner_email == owner_email)
return db_query.all()
def update_guest(
db: Session,
guest_id: UUID,
event_id: UUID,
guest: schemas.GuestUpdate
) -> Optional[models.Guest]:
"""Update guest (verify it belongs to event)"""
db_guest = get_guest(db, guest_id, event_id)
if db_guest:
update_data = guest.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_guest, field, value)
db.commit()
db.refresh(db_guest)
return db_guest
def delete_guest(db: Session, guest_id: UUID, event_id: UUID) -> bool:
"""Delete guest (verify it belongs to event)"""
db_guest = get_guest(db, guest_id, event_id)
if db_guest:
db.delete(db_guest)
db.commit()
return True
return False
def bulk_import_guests(
db: Session,
event_id: UUID,
guests: list[schemas.GuestImportItem],
added_by_user_id: UUID
) -> list[models.Guest]:
"""Import multiple guests at once"""
imported_guests = []
for guest_data in guests:
db_guest = models.Guest(
event_id=event_id,
added_by_user_id=added_by_user_id,
**guest_data.model_dump()
)
db.add(db_guest)
imported_guests.append(db_guest)
db.commit()
# Refresh all to get IDs and timestamps
for guest in imported_guests:
db.refresh(guest)
return imported_guests
def delete_guests_bulk(db: Session, event_id: UUID, guest_ids: list[UUID]) -> int:
"""Delete multiple guests"""
deleted_count = db.query(models.Guest).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.id.in_(guest_ids)
)
).delete(synchronize_session=False)
db.commit()
return deleted_count
def get_guests_by_status(db: Session, event_id: UUID, status: str):
"""Get guests with specific status"""
return db.query(models.Guest).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.rsvp_status == status
)
).all()
def get_guests_by_side(db: Session, event_id: UUID, side: str):
"""Get guests for a specific side"""
return db.query(models.Guest).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.side == side
)
).all()
def get_guest_by_phone(db: Session, event_id: UUID, phone: str) -> Optional[models.Guest]:
"""Get guest by phone number (within event)"""
return db.query(models.Guest).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.phone_number == phone
)
).first()
# ============================================
# Statistics and Analytics
# ============================================
def get_event_stats(db: Session, event_id: UUID):
"""Get summary stats for an event"""
total = db.query(func.count(models.Guest.id)).filter(
models.Guest.event_id == event_id
).scalar() or 0
confirmed = db.query(func.count(models.Guest.id)).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.rsvp_status == "confirmed"
)
).scalar() or 0
declined = db.query(func.count(models.Guest.id)).filter(
and_(
models.Guest.event_id == event_id,
models.Guest.rsvp_status == "declined"
)
).scalar() or 0
invited = total - confirmed - declined
return {
"total": total,
"confirmed": confirmed,
"declined": declined,
"invited": invited,
"confirmation_rate": (confirmed / total * 100) if total > 0 else 0
}
def get_sides_summary(db: Session, event_id: UUID):
"""Get guest breakdown by side"""
sides = db.query(
models.Guest.side,
func.count(models.Guest.id).label('count')
).filter(
models.Guest.event_id == event_id
).group_by(models.Guest.side).all()
return [{"side": side, "count": count} for side, count in sides]