from typing import List, Dict, Any import os import secrets from pathlib import Path from fastapi import APIRouter, Depends, HTTPException, UploadFile, File from sqlalchemy.orm import Session from sqlalchemy import func from app.core.config import settings from app.deps import get_db, require_admin, get_current_user from app.models import Course, Module, QuizQuestion, QuizChoice, CourseGroup, Group, GroupMembership, User, UserRole, Upload, Enrollment, ModuleProgress, QuizAttempt, ProgressStatus from app.schemas import ( AdminCourseCreate, AdminCourseOut, AdminCourseUpdate, AdminUserOut, AdminUserUpdate, GroupCreate, GroupOut, GroupUpdate, GroupMemberAdd, ModuleCreate, ModuleOut, ModuleUpdate, QuizQuestionCreate, QuizQuestionOut, QuizQuestionUpdate, UploadOut, ) router = APIRouter(prefix="/admin", tags=["admin"], dependencies=[Depends(require_admin)]) @router.get("/courses", response_model=List[AdminCourseOut]) def list_courses(db: Session = Depends(get_db)): courses = db.query(Course).order_by(Course.created_at.desc()).all() results = [] for course in courses: group_ids = [cg.group_id for cg in course.course_groups] results.append( AdminCourseOut( id=course.id, title=course.title, description=course.description, is_published=course.is_published, created_at=course.created_at, updated_at=course.updated_at, group_ids=group_ids, ) ) return results @router.post("/courses", response_model=AdminCourseOut) def create_course(payload: AdminCourseCreate, db: Session = Depends(get_db)): course = Course(**payload.model_dump(exclude={"group_ids"})) db.add(course) db.commit() db.refresh(course) if payload.group_ids: for gid in payload.group_ids: db.add(CourseGroup(course_id=course.id, group_id=gid)) db.commit() db.refresh(course) return AdminCourseOut( id=course.id, title=course.title, description=course.description, is_published=course.is_published, created_at=course.created_at, updated_at=course.updated_at, group_ids=[cg.group_id for cg in course.course_groups], ) @router.put("/courses/{course_id}", response_model=AdminCourseOut) def update_course(course_id: int, payload: AdminCourseUpdate, db: Session = Depends(get_db)): course = db.get(Course, course_id) if not course: raise HTTPException(status_code=404, detail="Course not found") data = payload.model_dump(exclude_unset=True) group_ids = data.pop("group_ids", None) for key, value in data.items(): setattr(course, key, value) if group_ids is not None: course.course_groups.clear() for gid in group_ids: course.course_groups.append(CourseGroup(group_id=gid)) db.commit() db.refresh(course) return AdminCourseOut( id=course.id, title=course.title, description=course.description, is_published=course.is_published, created_at=course.created_at, updated_at=course.updated_at, group_ids=[cg.group_id for cg in course.course_groups], ) @router.delete("/courses/{course_id}") def delete_course(course_id: int, db: Session = Depends(get_db)): course = db.get(Course, course_id) if not course: raise HTTPException(status_code=404, detail="Course not found") db.delete(course) db.commit() return {"ok": True} @router.patch("/courses/{course_id}/publish", response_model=AdminCourseOut) def toggle_publish(course_id: int, db: Session = Depends(get_db)): course = db.get(Course, course_id) if not course: raise HTTPException(status_code=404, detail="Course not found") course.is_published = not course.is_published db.commit() db.refresh(course) return AdminCourseOut( id=course.id, title=course.title, description=course.description, is_published=course.is_published, created_at=course.created_at, updated_at=course.updated_at, group_ids=[cg.group_id for cg in course.course_groups], ) @router.get("/courses/{course_id}/modules", response_model=List[ModuleOut]) def list_modules(course_id: int, db: Session = Depends(get_db)): return ( db.query(Module) .filter(Module.course_id == course_id) .order_by(Module.order_index) .all() ) @router.post("/courses/{course_id}/modules", response_model=ModuleOut) def create_module(course_id: int, payload: ModuleCreate, db: Session = Depends(get_db)): course = db.get(Course, course_id) if not course: raise HTTPException(status_code=404, detail="Course not found") module = Module(course_id=course_id, **payload.model_dump()) db.add(module) db.commit() db.refresh(module) return module @router.put("/modules/{module_id}", response_model=ModuleOut) def update_module(module_id: int, payload: ModuleUpdate, db: Session = Depends(get_db)): module = db.get(Module, module_id) if not module: raise HTTPException(status_code=404, detail="Module not found") for key, value in payload.model_dump(exclude_unset=True).items(): setattr(module, key, value) db.commit() db.refresh(module) return module @router.get("/modules/{module_id}", response_model=ModuleOut) def get_module(module_id: int, db: Session = Depends(get_db)): module = db.get(Module, module_id) if not module: raise HTTPException(status_code=404, detail="Module not found") return module @router.get("/users", response_model=List[AdminUserOut]) def list_users(q: str | None = None, db: Session = Depends(get_db)): query = db.query(User) if q: query = query.filter(User.email.ilike(f"%{q}%")) return query.order_by(User.created_at.desc()).all() @router.get("/users/{user_id}", response_model=AdminUserOut) def get_user(user_id: int, db: Session = Depends(get_db)): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") return user @router.patch("/users/{user_id}", response_model=AdminUserOut) def update_user(user_id: int, payload: AdminUserUpdate, db: Session = Depends(get_db)): user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") data = payload.model_dump(exclude_unset=True) if "role" in data and data["role"] in [UserRole.admin, UserRole.learner, "admin", "learner"]: user.role = UserRole(data["role"]) if "is_active" in data: user.is_active = data["is_active"] db.commit() db.refresh(user) return user @router.get("/groups", response_model=List[GroupOut]) def list_groups(db: Session = Depends(get_db)): return db.query(Group).order_by(Group.created_at.desc()).all() @router.post("/groups", response_model=GroupOut) def create_group(payload: GroupCreate, db: Session = Depends(get_db)): existing = db.query(Group).filter(Group.name == payload.name).first() if existing: raise HTTPException(status_code=400, detail="Group name already exists") group = Group(name=payload.name) db.add(group) db.commit() db.refresh(group) return group @router.put("/groups/{group_id}", response_model=GroupOut) def update_group(group_id: int, payload: GroupUpdate, db: Session = Depends(get_db)): group = db.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") for key, value in payload.model_dump(exclude_unset=True).items(): setattr(group, key, value) db.commit() db.refresh(group) return group @router.delete("/groups/{group_id}") def delete_group(group_id: int, db: Session = Depends(get_db)): group = db.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") db.delete(group) db.commit() return {"ok": True} @router.get("/groups/{group_id}/members", response_model=List[AdminUserOut]) def list_group_members(group_id: int, db: Session = Depends(get_db)): group = db.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") members = ( db.query(User) .join(GroupMembership, GroupMembership.user_id == User.id) .filter(GroupMembership.group_id == group_id) .all() ) return members @router.post("/groups/{group_id}/members") def add_group_member(group_id: int, payload: GroupMemberAdd, db: Session = Depends(get_db)): group = db.get(Group, group_id) if not group: raise HTTPException(status_code=404, detail="Group not found") user = None if payload.user_id: user = db.get(User, payload.user_id) if not user and payload.email: user = db.query(User).filter(User.email == payload.email).first() if not user: raise HTTPException(status_code=404, detail="User not found") existing = ( db.query(GroupMembership) .filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user.id) .first() ) if existing: return {"ok": True} db.add(GroupMembership(group_id=group_id, user_id=user.id)) db.commit() return {"ok": True} @router.delete("/groups/{group_id}/members/{user_id}") def remove_group_member(group_id: int, user_id: int, db: Session = Depends(get_db)): membership = ( db.query(GroupMembership) .filter(GroupMembership.group_id == group_id, GroupMembership.user_id == user_id) .first() ) if not membership: raise HTTPException(status_code=404, detail="Membership not found") db.delete(membership) db.commit() return {"ok": True} @router.delete("/modules/{module_id}") def delete_module(module_id: int, db: Session = Depends(get_db)): module = db.get(Module, module_id) if not module: raise HTTPException(status_code=404, detail="Module not found") db.delete(module) db.commit() return {"ok": True} @router.get("/modules/{module_id}/questions", response_model=List[QuizQuestionOut]) def list_questions(module_id: int, db: Session = Depends(get_db)): return ( db.query(QuizQuestion) .filter(QuizQuestion.module_id == module_id) .all() ) @router.post("/modules/{module_id}/questions", response_model=QuizQuestionOut) def create_question(module_id: int, payload: QuizQuestionCreate, db: Session = Depends(get_db)): module = db.get(Module, module_id) if not module: raise HTTPException(status_code=404, detail="Module not found") question = QuizQuestion(module_id=module_id, prompt=payload.prompt, question_type=payload.question_type) db.add(question) db.flush() for choice in payload.choices: db.add(QuizChoice(question_id=question.id, text=choice.text, is_correct=choice.is_correct)) db.commit() db.refresh(question) return question @router.put("/questions/{question_id}", response_model=QuizQuestionOut) def update_question(question_id: int, payload: QuizQuestionUpdate, db: Session = Depends(get_db)): question = db.get(QuizQuestion, question_id) if not question: raise HTTPException(status_code=404, detail="Question not found") data = payload.model_dump(exclude_unset=True) if "prompt" in data: question.prompt = data["prompt"] if "question_type" in data: question.question_type = data["question_type"] if "choices" in data: question.choices.clear() for choice in data["choices"] or []: question.choices.append(QuizChoice(text=choice.text, is_correct=choice.is_correct)) db.commit() db.refresh(question) return question @router.delete("/questions/{question_id}") def delete_question(question_id: int, db: Session = Depends(get_db)): question = db.get(QuizQuestion, question_id) if not question: raise HTTPException(status_code=404, detail="Question not found") db.delete(question) db.commit() return {"ok": True} @router.post("/uploads", response_model=UploadOut) async def upload_file(file: UploadFile = File(...), db: Session = Depends(get_db), user: User = Depends(get_current_user)): # Validate file type (PDF or PPTX only) allowed_types = ["application/pdf", "application/vnd.openxmlformats-officedocument.presentationml.presentation", "application/vnd.ms-powerpoint"] if file.content_type not in allowed_types: raise HTTPException(status_code=400, detail="Only PDF and PPTX files are allowed") # Ensure upload directory exists upload_dir = Path(settings.upload_dir) upload_dir.mkdir(parents=True, exist_ok=True) # Generate unique filename file_ext = Path(file.filename).suffix unique_name = f"{secrets.token_urlsafe(16)}{file_ext}" file_path = upload_dir / unique_name # Save file content = await file.read() with open(file_path, "wb") as f: f.write(content) # Store metadata in database upload = Upload( filename=file.filename, content_type=file.content_type, file_path=str(file_path), size_bytes=len(content), uploaded_by=user.id, ) db.add(upload) db.commit() db.refresh(upload) return upload @router.get("/uploads/{upload_id}") def get_upload(upload_id: int, db: Session = Depends(get_db)): upload = db.get(Upload, upload_id) if not upload: raise HTTPException(status_code=404, detail="Upload not found") return upload @router.get("/stats/users") def stats_users(db: Session = Depends(get_db)) -> List[Dict[str, Any]]: """Get statistics for all users""" users = db.query(User).filter(User.role == UserRole.learner).all() result = [] for user in users: enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.user_id == user.id).scalar() completed_count = ( db.query(func.count(func.distinct(ModuleProgress.module_id))) .join(Module, Module.id == ModuleProgress.module_id) .filter( ModuleProgress.user_id == user.id, ModuleProgress.status == ProgressStatus.completed, ) .scalar() ) attempts_count = db.query(func.count(QuizAttempt.id)).filter(QuizAttempt.user_id == user.id).scalar() result.append({ "id": user.id, "email": user.email, "enrollments_count": enrollments_count, "completed_modules_count": completed_count, "attempts_count": attempts_count, "created_at": user.created_at, }) return result @router.get("/stats/users/{user_id}") def stats_user(user_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]: """Get detailed statistics for a specific user""" user = db.get(User, user_id) if not user: raise HTTPException(status_code=404, detail="User not found") enrollments = db.query(Enrollment).filter(Enrollment.user_id == user_id).all() courses_data = [] for enrollment in enrollments: course = db.get(Course, enrollment.course_id) if not course: continue modules = db.query(Module).filter(Module.course_id == course.id).order_by(Module.order_index).all() progress_map = { p.module_id: p for p in db.query(ModuleProgress).filter(ModuleProgress.user_id == user_id).all() } completed = [p for p in progress_map.values() if p.status == ProgressStatus.completed] total_modules = len(modules) completed_modules = len([m for m in modules if progress_map.get(m.id) and progress_map.get(m.id).status == ProgressStatus.completed]) progress_percent = (completed_modules / total_modules * 100) if total_modules > 0 else 0 # Get quiz attempts for this course attempts_data = [] for module in modules: if module.type.value == "quiz": attempts = ( db.query(QuizAttempt) .filter(QuizAttempt.user_id == user_id, QuizAttempt.module_id == module.id) .order_by(QuizAttempt.submitted_at.desc()) .all() ) for attempt in attempts: attempts_data.append({ "module_id": module.id, "module_title": module.title, "score": attempt.score, "passed": attempt.passed, "submitted_at": attempt.submitted_at, }) courses_data.append({ "course_id": course.id, "course_title": course.title, "enrolled_at": enrollment.enrolled_at, "progress_percent": progress_percent, "completed_modules": completed_modules, "total_modules": total_modules, "attempts": attempts_data, }) return { "user_id": user_id, "email": user.email, "courses": courses_data, } @router.get("/stats/courses") def stats_courses(db: Session = Depends(get_db)) -> List[Dict[str, Any]]: """Get statistics for all courses""" courses = db.query(Course).all() result = [] for course in courses: enrollments_count = db.query(func.count(Enrollment.id)).filter(Enrollment.course_id == course.id).scalar() # Count users who completed all modules modules = db.query(Module).filter(Module.course_id == course.id).all() total_modules = len(modules) completions_count = 0 if total_modules > 0: enrollments = db.query(Enrollment).filter(Enrollment.course_id == course.id).all() for enrollment in enrollments: completed_modules = ( db.query(func.count(ModuleProgress.id)) .join(Module, Module.id == ModuleProgress.module_id) .filter( Module.course_id == course.id, ModuleProgress.user_id == enrollment.user_id, ModuleProgress.status == ProgressStatus.completed, ) .scalar() ) if completed_modules >= total_modules: completions_count += 1 # Get final exam average (last module if it's a quiz) final_exam_avg = None if modules: last_module = modules[-1] if last_module.type.value == "quiz": avg_score = db.query(func.avg(QuizAttempt.score)).filter(QuizAttempt.module_id == last_module.id).scalar() final_exam_avg = float(avg_score) if avg_score else None result.append({ "id": course.id, "title": course.title, "enrollments_count": enrollments_count, "completions_count": completions_count, "final_exam_avg_score": final_exam_avg, }) return result @router.get("/stats/courses/{course_id}") def stats_course(course_id: int, db: Session = Depends(get_db)) -> Dict[str, Any]: """Get detailed statistics for a specific course""" course = db.get(Course, course_id) if not course: raise HTTPException(status_code=404, detail="Course not found") enrollments = db.query(Enrollment).filter(Enrollment.course_id == course_id).all() modules = db.query(Module).filter(Module.course_id == course_id).order_by(Module.order_index).all() attempts_distribution = [] for module in modules: if module.type.value == "quiz": attempts = db.query(QuizAttempt).filter(QuizAttempt.module_id == module.id).all() attempts_distribution.append({ "module_id": module.id, "module_title": module.title, "total_attempts": len(attempts), "avg_score": sum(a.score for a in attempts) / len(attempts) if attempts else 0, "pass_rate": sum(1 for a in attempts if a.passed) / len(attempts) * 100 if attempts else 0, }) return { "course_id": course.id, "title": course.title, "enrollments_count": len(enrollments), "total_modules": len(modules), "attempts_distribution": attempts_distribution, }