90 lines
3.0 KiB
Python
90 lines
3.0 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from typing import List
|
|
from ..database.database import get_db
|
|
from ..models.brand import Brand
|
|
from ..models.user import User
|
|
from ..schemas.brand import BrandCreate, BrandUpdate, BrandResponse
|
|
from ..services.auth import get_current_user
|
|
|
|
router = APIRouter(prefix="/api/brands", tags=["brands"])
|
|
|
|
@router.get("", response_model=List[BrandResponse])
|
|
def list_brands(db: Session = Depends(get_db)):
|
|
"""Get all brands"""
|
|
brands = db.query(Brand).order_by(Brand.name).all()
|
|
return brands
|
|
|
|
@router.get("/{brand_id}", response_model=BrandResponse)
|
|
def get_brand(brand_id: int, db: Session = Depends(get_db)):
|
|
"""Get a specific brand"""
|
|
brand = db.query(Brand).filter(Brand.id == brand_id).first()
|
|
if not brand:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
return brand
|
|
|
|
@router.post("", response_model=BrandResponse)
|
|
def create_brand(
|
|
brand_data: BrandCreate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Create a new brand (admin only)"""
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
# Check if brand already exists
|
|
existing = db.query(Brand).filter(Brand.name == brand_data.name).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Brand already exists")
|
|
|
|
brand = Brand(name=brand_data.name)
|
|
db.add(brand)
|
|
db.commit()
|
|
db.refresh(brand)
|
|
return brand
|
|
|
|
@router.put("/{brand_id}", response_model=BrandResponse)
|
|
def update_brand(
|
|
brand_id: int,
|
|
brand_data: BrandUpdate,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Update a brand (admin only)"""
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
brand = db.query(Brand).filter(Brand.id == brand_id).first()
|
|
if not brand:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
# Check if new name conflicts with existing brand
|
|
if brand_data.name != brand.name:
|
|
existing = db.query(Brand).filter(Brand.name == brand_data.name).first()
|
|
if existing:
|
|
raise HTTPException(status_code=400, detail="Brand name already exists")
|
|
|
|
brand.name = brand_data.name
|
|
db.commit()
|
|
db.refresh(brand)
|
|
return brand
|
|
|
|
@router.delete("/{brand_id}")
|
|
def delete_brand(
|
|
brand_id: int,
|
|
db: Session = Depends(get_db),
|
|
current_user: User = Depends(get_current_user)
|
|
):
|
|
"""Delete a brand (admin only)"""
|
|
if not current_user.is_admin:
|
|
raise HTTPException(status_code=403, detail="Admin access required")
|
|
|
|
brand = db.query(Brand).filter(Brand.id == brand_id).first()
|
|
if not brand:
|
|
raise HTTPException(status_code=404, detail="Brand not found")
|
|
|
|
db.delete(brand)
|
|
db.commit()
|
|
return {"message": "Brand deleted successfully"}
|