106 lines
3.2 KiB
Python
106 lines
3.2 KiB
Python
from fastapi import APIRouter, Depends, HTTPException
|
|
from sqlalchemy.orm import Session
|
|
from typing import List, Optional
|
|
from app.database.database import get_db
|
|
from app.models import Model, User
|
|
from app.schemas.model import ModelCreate, ModelUpdate, ModelResponse
|
|
from app.services.auth import get_current_admin_user
|
|
|
|
router = APIRouter(prefix="/api/models", tags=["models"])
|
|
|
|
|
|
@router.get("", response_model=List[ModelResponse])
|
|
def get_models(
|
|
category_id: Optional[int] = None,
|
|
brand: Optional[str] = None,
|
|
db: Session = Depends(get_db)
|
|
):
|
|
"""Get all models with optional filtering by category and brand"""
|
|
query = db.query(Model)
|
|
|
|
if category_id:
|
|
query = query.filter(Model.category_id == category_id)
|
|
if brand:
|
|
query = query.filter(Model.brand.ilike(f"%{brand}%"))
|
|
|
|
return query.all()
|
|
|
|
|
|
@router.get("/{model_id}", response_model=ModelResponse)
|
|
def get_model(model_id: int, db: Session = Depends(get_db)):
|
|
"""Get a specific model by ID"""
|
|
model = db.query(Model).filter(Model.id == model_id).first()
|
|
if not model:
|
|
raise HTTPException(status_code=404, detail="Model not found")
|
|
return model
|
|
|
|
|
|
@router.post("", response_model=ModelResponse)
|
|
def create_model(
|
|
model_data: ModelCreate,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Create a new model (admin only)"""
|
|
# Check if model with same name, category, and brand already exists
|
|
existing = db.query(Model).filter(
|
|
Model.name == model_data.name,
|
|
Model.category_id == model_data.category_id,
|
|
Model.brand == model_data.brand
|
|
).first()
|
|
|
|
if existing:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail=f"Model '{model_data.name}' already exists for brand '{model_data.brand}' in this category"
|
|
)
|
|
|
|
db_model = Model(**model_data.dict())
|
|
db.add(db_model)
|
|
db.commit()
|
|
db.refresh(db_model)
|
|
return db_model
|
|
|
|
|
|
@router.put("/{model_id}", response_model=ModelResponse)
|
|
def update_model(
|
|
model_id: int,
|
|
model_update: ModelUpdate,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Update a model (admin only)"""
|
|
model = db.query(Model).filter(Model.id == model_id).first()
|
|
if not model:
|
|
raise HTTPException(status_code=404, detail="Model not found")
|
|
|
|
for field, value in model_update.dict(exclude_unset=True).items():
|
|
setattr(model, field, value)
|
|
|
|
db.commit()
|
|
db.refresh(model)
|
|
return model
|
|
|
|
|
|
@router.delete("/{model_id}")
|
|
def delete_model(
|
|
model_id: int,
|
|
db: Session = Depends(get_db),
|
|
admin: User = Depends(get_current_admin_user)
|
|
):
|
|
"""Delete a model (admin only)"""
|
|
model = db.query(Model).filter(Model.id == model_id).first()
|
|
if not model:
|
|
raise HTTPException(status_code=404, detail="Model not found")
|
|
|
|
db.delete(model)
|
|
db.commit()
|
|
return {"message": "Model deleted successfully"}
|
|
|
|
|
|
@router.get("/brands/list")
|
|
def get_brands(db: Session = Depends(get_db)):
|
|
"""Get list of unique brands"""
|
|
brands = db.query(Model.brand).distinct().all()
|
|
return [brand[0] for brand in brands if brand[0]]
|